mirror of
https://github.com/memohai/Memoh.git
synced 2026-04-25 07:00:48 +09:00
43c4153938
* refactor: introduce DCP pipeline layer for unified context assembly
Introduce a Deterministic Context Pipeline (DCP) inspired by Cahciua,
providing event-driven context assembly for LLM conversations.
- Add `internal/pipeline/` package with Canonical Event types, Projection
(reduce), Rendering (XML RC), Pipeline manager, and EventStore persistence
- Change user message format from YAML front-matter to XML `<message>` tags
with self-contained attributes (sender, channel, conversation, type)
- Merge CLI/Web dual API into single `/local/` endpoint, remove CLI handler
- Add `bot_session_events` table for event persistence and cold-start replay
- Add `discuss` session type (reserved for future Cahciua-style mode)
- Wire pipeline into HandleInbound: adapt → persist → push on every message
- Lazy cold-start replay: load events from DB on first session access
* feat: implement discuss mode with reactive driver and probe gate
Add discuss session mode where the bot autonomously decides when to speak
in group chats via tool-gated output (send tool only, no direct text reply).
- Add discuss driver (per-session goroutine, RC watch, step loop via
agent.Generate, TR persistence, late-binding prompt with mention hints)
- Add system_discuss.md prompt template ("text = inner monologue, send = speak")
- Add context composition (MergeContext, ComposeContext, TrimContext) for
RC + assistant/tool message interleaving by timestamp
- Add probe gate: when discuss_probe_model_id is set, cheap model pre-filters
group messages; no tool calls = silence, tool calls = activate primary
- Add /new [chat|discuss] command: explicit mode selection, defaults to
discuss in groups, chat in DMs, chat-only for WebUI
- Add ResolveRunConfig on flow.Resolver for discuss driver to reuse
model/tools/system-prompt resolution without reimplementing
- Fix send tool for discuss mode: same-conversation sends now go through
SendDirect (channel adapter) instead of the local emitter shortcut
- Add target attribute to XML message format (reply_target for routing)
- Add discuss_probe_model_id to bots table settings
- Remove pipeline compaction (SetCompactCursor) — reuse existing compaction.Service
- Persist full SDK messages (including tool calls) in discuss mode
* refactor: unify DCP event layer, fix persistence and local channel
- Fix bot_session_events dedup index to include event_kind so that
message + edit events for the same external_message_id coexist.
- Change CreateSessionEvent from :one to :exec so ON CONFLICT DO NOTHING
does not produce spurious errors on duplicate delivery.
- Move ACL evaluation before event ingest; denied messages no longer
enter bot_session_events or the in-memory pipeline.
- Let chat mode consume RenderedContext from the DCP pipeline when
available, sharing the same event-driven context assembly as discuss.
- Collapse local WebSocket handler to route through HandleInbound
instead of directly calling StreamChatWS, eliminating the dual
business entry point.
- Extract buildBaseRunConfig shared builder so resolve() and
ResolveRunConfig() no longer duplicate model/credentials/skills setup.
- Add StoreRound to RunConfigResolver interface so discuss driver
persists assistant output with full metadata, usage, and memory
extraction (same quality as chat mode).
- Fix discuss driver context: use context.Background() instead of the
short-lived HTTP request context that was getting cancelled.
- Fix model ID passed to StoreRound: return database UUID from
ResolveRunConfig instead of SDK model name.
- Remove dead CLIAdapter/CLIType and update legacy web/cli references
in tests and comments.
* fix: stop idle discuss goroutines after 10min timeout
Discuss session goroutines were never cleaned up when a session became
inactive (e.g. after /new). Add a 10-minute idle timer that auto-exits
the goroutine and removes it from the sessions map when no new RC
arrives.
* refactor: pipeline details — event types, structured reply, display content
- Remove [User sent N attachments] placeholder text from buildInboundQuery;
attachment info is now expressed via pipeline <attachment> tags.
- Unify in-reply-to as structured ReplyRef (Sender/Preview fields) across
Telegram, Discord, Feishu, and Matrix adapters instead of prepending
[Reply to ...] text into the message body. Remove now-unused
buildTelegramQuotedText, buildDiscordQuotedText, buildMatrixQuotedText.
- Make AdaptInbound return CanonicalEvent interface and dispatch to
adaptMessage/adaptEdit/adaptService based on metadata["event_type"].
- Add event_id column to bot_history_messages (migration 0059) so user
messages can reference their canonical pipeline event.
- PersistEvent now returns the event UUID; HandleInbound passes it through
to both persistPassiveMessage and ChatRequest.EventID for storeRound.
- Add FillDisplayContent to message service: extracts plain text from
event_data for clean frontend display.
- Frontend extractMessageText prefers display_content when available,
falling back to legacy strip logic for old messages.
- Fix: always generate headerifiedQuery for storage even when usePipeline
is true, so user messages are persisted via storeRound in chat mode.
* fix: use json.Marshal for pipeline context content serialization
The manual string escaping in buildMessagesFromPipeline only handled
double quotes but not newlines, backslashes, and other JSON special
characters, producing invalid json.RawMessage values. The LLM then
received empty/malformed context and complained about having no history.
* fix: restore WebSocket handler to use StreamChatWS directly
The previous refactoring replaced the WS handler with HandleInbound +
RouteHub subscription, which broke streaming because RouteHub events
use a different format (channel.StreamEvent) than what the frontend
expects (flow.WSStreamEvent with text_delta, tool_call_start, etc.).
Restore the original direct StreamChatWS call path so WebUI streaming
works again. The WS handler now matches the pre-refactoring behavior
while all other changes (pipeline, ACL, event types, etc.) are kept.
* feat: store display_text directly in bot_history_messages
Instead of computing display content at API response time by querying
bot_session_events via event_id, store the raw user text in a dedicated
display_text column at write time. This works for all paths including
the WebSocket handler which does not go through the pipeline/event layer.
- Migration 0060: add display_text TEXT column
- PersistInput gains DisplayText; filled from trimmedText (passive) and
req.Query (storeRound)
- toMessageFields reads display_text into DisplayContent
- Remove FillDisplayContent runtime query and ListSessionEventsByEventID
- Frontend already prefers display_content when available (no change)
* fix: display_text should contain raw user text, not XML-wrapped query
req.Query gets overwritten to headerifiedQuery (with XML <message> tags)
before storeRound runs. Add RawQuery field to ChatRequest to preserve
the original user text, and use it for display_text in storeMessages.
* fix(web): show discuss sessions
* refactor: introduce DCP pipeline layer for unified context assembly
Introduce a Deterministic Context Pipeline (DCP) inspired by Cahciua,
providing event-driven context assembly for LLM conversations.
- Add `internal/pipeline/` package with Canonical Event types, Projection
(reduce), Rendering (XML RC), Pipeline manager, and EventStore persistence
- Change user message format from YAML front-matter to XML `<message>` tags
with self-contained attributes (sender, channel, conversation, type)
- Merge CLI/Web dual API into single `/local/` endpoint, remove CLI handler
- Add `bot_session_events` table for event persistence and cold-start replay
- Add `discuss` session type (reserved for future Cahciua-style mode)
- Wire pipeline into HandleInbound: adapt → persist → push on every message
- Lazy cold-start replay: load events from DB on first session access
* feat: implement discuss mode with reactive driver and probe gate
Add discuss session mode where the bot autonomously decides when to speak
in group chats via tool-gated output (send tool only, no direct text reply).
- Add discuss driver (per-session goroutine, RC watch, step loop via
agent.Generate, TR persistence, late-binding prompt with mention hints)
- Add system_discuss.md prompt template ("text = inner monologue, send = speak")
- Add context composition (MergeContext, ComposeContext, TrimContext) for
RC + assistant/tool message interleaving by timestamp
- Add probe gate: when discuss_probe_model_id is set, cheap model pre-filters
group messages; no tool calls = silence, tool calls = activate primary
- Add /new [chat|discuss] command: explicit mode selection, defaults to
discuss in groups, chat in DMs, chat-only for WebUI
- Add ResolveRunConfig on flow.Resolver for discuss driver to reuse
model/tools/system-prompt resolution without reimplementing
- Fix send tool for discuss mode: same-conversation sends now go through
SendDirect (channel adapter) instead of the local emitter shortcut
- Add target attribute to XML message format (reply_target for routing)
- Add discuss_probe_model_id to bots table settings
- Remove pipeline compaction (SetCompactCursor) — reuse existing compaction.Service
- Persist full SDK messages (including tool calls) in discuss mode
* refactor: unify DCP event layer, fix persistence and local channel
- Fix bot_session_events dedup index to include event_kind so that
message + edit events for the same external_message_id coexist.
- Change CreateSessionEvent from :one to :exec so ON CONFLICT DO NOTHING
does not produce spurious errors on duplicate delivery.
- Move ACL evaluation before event ingest; denied messages no longer
enter bot_session_events or the in-memory pipeline.
- Let chat mode consume RenderedContext from the DCP pipeline when
available, sharing the same event-driven context assembly as discuss.
- Collapse local WebSocket handler to route through HandleInbound
instead of directly calling StreamChatWS, eliminating the dual
business entry point.
- Extract buildBaseRunConfig shared builder so resolve() and
ResolveRunConfig() no longer duplicate model/credentials/skills setup.
- Add StoreRound to RunConfigResolver interface so discuss driver
persists assistant output with full metadata, usage, and memory
extraction (same quality as chat mode).
- Fix discuss driver context: use context.Background() instead of the
short-lived HTTP request context that was getting cancelled.
- Fix model ID passed to StoreRound: return database UUID from
ResolveRunConfig instead of SDK model name.
- Remove dead CLIAdapter/CLIType and update legacy web/cli references
in tests and comments.
* fix: stop idle discuss goroutines after 10min timeout
Discuss session goroutines were never cleaned up when a session became
inactive (e.g. after /new). Add a 10-minute idle timer that auto-exits
the goroutine and removes it from the sessions map when no new RC
arrives.
* refactor: pipeline details — event types, structured reply, display content
- Remove [User sent N attachments] placeholder text from buildInboundQuery;
attachment info is now expressed via pipeline <attachment> tags.
- Unify in-reply-to as structured ReplyRef (Sender/Preview fields) across
Telegram, Discord, Feishu, and Matrix adapters instead of prepending
[Reply to ...] text into the message body. Remove now-unused
buildTelegramQuotedText, buildDiscordQuotedText, buildMatrixQuotedText.
- Make AdaptInbound return CanonicalEvent interface and dispatch to
adaptMessage/adaptEdit/adaptService based on metadata["event_type"].
- Add event_id column to bot_history_messages (migration 0059) so user
messages can reference their canonical pipeline event.
- PersistEvent now returns the event UUID; HandleInbound passes it through
to both persistPassiveMessage and ChatRequest.EventID for storeRound.
- Add FillDisplayContent to message service: extracts plain text from
event_data for clean frontend display.
- Frontend extractMessageText prefers display_content when available,
falling back to legacy strip logic for old messages.
- Fix: always generate headerifiedQuery for storage even when usePipeline
is true, so user messages are persisted via storeRound in chat mode.
* fix: use json.Marshal for pipeline context content serialization
The manual string escaping in buildMessagesFromPipeline only handled
double quotes but not newlines, backslashes, and other JSON special
characters, producing invalid json.RawMessage values. The LLM then
received empty/malformed context and complained about having no history.
* fix: restore WebSocket handler to use StreamChatWS directly
The previous refactoring replaced the WS handler with HandleInbound +
RouteHub subscription, which broke streaming because RouteHub events
use a different format (channel.StreamEvent) than what the frontend
expects (flow.WSStreamEvent with text_delta, tool_call_start, etc.).
Restore the original direct StreamChatWS call path so WebUI streaming
works again. The WS handler now matches the pre-refactoring behavior
while all other changes (pipeline, ACL, event types, etc.) are kept.
* feat: store display_text directly in bot_history_messages
Instead of computing display content at API response time by querying
bot_session_events via event_id, store the raw user text in a dedicated
display_text column at write time. This works for all paths including
the WebSocket handler which does not go through the pipeline/event layer.
- Migration 0060: add display_text TEXT column
- PersistInput gains DisplayText; filled from trimmedText (passive) and
req.Query (storeRound)
- toMessageFields reads display_text into DisplayContent
- Remove FillDisplayContent runtime query and ListSessionEventsByEventID
- Frontend already prefers display_content when available (no change)
* fix: display_text should contain raw user text, not XML-wrapped query
req.Query gets overwritten to headerifiedQuery (with XML <message> tags)
before storeRound runs. Add RawQuery field to ChatRequest to preserve
the original user text, and use it for display_text in storeMessages.
* fix(web): show discuss sessions
* chore(feishu): change discuss output to stream card
* fix(channel): unify discuss/chat send path and card markdown delivery
* feat(discuss): switch to stream execution with RouteHub broadcasting
* refactor(pipeline): remove context trimming from ComposeContext
The pipeline path should not trim context by token budget — the
upstream IC/RC already bounds the event window. Remove TrimContext,
FindWorkingWindowCursor, EstimateTokens, FormatLastProcessedMs (all
unused or only used for trimming), the maxTokens parameter from
ComposeContext, and MaxContextTokens from DiscussSessionConfig.
---------
Co-authored-by: 晨苒 <16112591+chen-ran@users.noreply.github.com>
1385 lines
42 KiB
Go
1385 lines
42 KiB
Go
// Code generated by sqlc. DO NOT EDIT.
|
|
// versions:
|
|
// sqlc v1.30.0
|
|
// source: messages.sql
|
|
|
|
package sqlc
|
|
|
|
import (
|
|
"context"
|
|
|
|
"github.com/jackc/pgx/v5/pgtype"
|
|
)
|
|
|
|
const createMessage = `-- name: CreateMessage :one
|
|
INSERT INTO bot_history_messages (
|
|
bot_id,
|
|
session_id,
|
|
sender_channel_identity_id,
|
|
sender_account_user_id,
|
|
source_message_id,
|
|
source_reply_to_message_id,
|
|
role,
|
|
content,
|
|
metadata,
|
|
usage,
|
|
model_id,
|
|
event_id,
|
|
display_text
|
|
)
|
|
VALUES (
|
|
$1,
|
|
$2::uuid,
|
|
$3::uuid,
|
|
$4::uuid,
|
|
$5::text,
|
|
$6::text,
|
|
$7,
|
|
$8,
|
|
$9,
|
|
$10,
|
|
$11::uuid,
|
|
$12::uuid,
|
|
$13::text
|
|
)
|
|
RETURNING
|
|
id,
|
|
bot_id,
|
|
session_id,
|
|
sender_channel_identity_id,
|
|
sender_account_user_id AS sender_user_id,
|
|
source_message_id AS external_message_id,
|
|
source_reply_to_message_id,
|
|
role,
|
|
content,
|
|
metadata,
|
|
usage,
|
|
event_id,
|
|
display_text,
|
|
created_at
|
|
`
|
|
|
|
type CreateMessageParams struct {
|
|
BotID pgtype.UUID `json:"bot_id"`
|
|
SessionID pgtype.UUID `json:"session_id"`
|
|
SenderChannelIdentityID pgtype.UUID `json:"sender_channel_identity_id"`
|
|
SenderUserID pgtype.UUID `json:"sender_user_id"`
|
|
ExternalMessageID pgtype.Text `json:"external_message_id"`
|
|
SourceReplyToMessageID pgtype.Text `json:"source_reply_to_message_id"`
|
|
Role string `json:"role"`
|
|
Content []byte `json:"content"`
|
|
Metadata []byte `json:"metadata"`
|
|
Usage []byte `json:"usage"`
|
|
ModelID pgtype.UUID `json:"model_id"`
|
|
EventID pgtype.UUID `json:"event_id"`
|
|
DisplayText pgtype.Text `json:"display_text"`
|
|
}
|
|
|
|
type CreateMessageRow struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
BotID pgtype.UUID `json:"bot_id"`
|
|
SessionID pgtype.UUID `json:"session_id"`
|
|
SenderChannelIdentityID pgtype.UUID `json:"sender_channel_identity_id"`
|
|
SenderUserID pgtype.UUID `json:"sender_user_id"`
|
|
ExternalMessageID pgtype.Text `json:"external_message_id"`
|
|
SourceReplyToMessageID pgtype.Text `json:"source_reply_to_message_id"`
|
|
Role string `json:"role"`
|
|
Content []byte `json:"content"`
|
|
Metadata []byte `json:"metadata"`
|
|
Usage []byte `json:"usage"`
|
|
EventID pgtype.UUID `json:"event_id"`
|
|
DisplayText pgtype.Text `json:"display_text"`
|
|
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
|
}
|
|
|
|
func (q *Queries) CreateMessage(ctx context.Context, arg CreateMessageParams) (CreateMessageRow, error) {
|
|
row := q.db.QueryRow(ctx, createMessage,
|
|
arg.BotID,
|
|
arg.SessionID,
|
|
arg.SenderChannelIdentityID,
|
|
arg.SenderUserID,
|
|
arg.ExternalMessageID,
|
|
arg.SourceReplyToMessageID,
|
|
arg.Role,
|
|
arg.Content,
|
|
arg.Metadata,
|
|
arg.Usage,
|
|
arg.ModelID,
|
|
arg.EventID,
|
|
arg.DisplayText,
|
|
)
|
|
var i CreateMessageRow
|
|
err := row.Scan(
|
|
&i.ID,
|
|
&i.BotID,
|
|
&i.SessionID,
|
|
&i.SenderChannelIdentityID,
|
|
&i.SenderUserID,
|
|
&i.ExternalMessageID,
|
|
&i.SourceReplyToMessageID,
|
|
&i.Role,
|
|
&i.Content,
|
|
&i.Metadata,
|
|
&i.Usage,
|
|
&i.EventID,
|
|
&i.DisplayText,
|
|
&i.CreatedAt,
|
|
)
|
|
return i, err
|
|
}
|
|
|
|
const deleteMessagesByBot = `-- name: DeleteMessagesByBot :exec
|
|
DELETE FROM bot_history_messages
|
|
WHERE bot_id = $1
|
|
`
|
|
|
|
func (q *Queries) DeleteMessagesByBot(ctx context.Context, botID pgtype.UUID) error {
|
|
_, err := q.db.Exec(ctx, deleteMessagesByBot, botID)
|
|
return err
|
|
}
|
|
|
|
const deleteMessagesBySession = `-- name: DeleteMessagesBySession :exec
|
|
DELETE FROM bot_history_messages
|
|
WHERE session_id = $1
|
|
`
|
|
|
|
func (q *Queries) DeleteMessagesBySession(ctx context.Context, sessionID pgtype.UUID) error {
|
|
_, err := q.db.Exec(ctx, deleteMessagesBySession, sessionID)
|
|
return err
|
|
}
|
|
|
|
const listActiveMessagesSince = `-- name: ListActiveMessagesSince :many
|
|
SELECT
|
|
m.id,
|
|
m.bot_id,
|
|
m.session_id,
|
|
m.sender_channel_identity_id,
|
|
m.sender_account_user_id AS sender_user_id,
|
|
m.source_message_id AS external_message_id,
|
|
m.source_reply_to_message_id,
|
|
m.role,
|
|
m.content,
|
|
m.metadata,
|
|
m.usage,
|
|
m.event_id,
|
|
m.display_text,
|
|
m.compact_id,
|
|
m.created_at,
|
|
ci.display_name AS sender_display_name,
|
|
ci.avatar_url AS sender_avatar_url,
|
|
s.channel_type AS platform
|
|
FROM bot_history_messages m
|
|
LEFT JOIN channel_identities ci ON ci.id = m.sender_channel_identity_id
|
|
LEFT JOIN bot_sessions s ON s.id = m.session_id
|
|
WHERE m.bot_id = $1
|
|
AND m.created_at >= $2
|
|
AND (m.metadata->>'trigger_mode' IS NULL OR m.metadata->>'trigger_mode' != 'passive_sync')
|
|
ORDER BY m.created_at ASC
|
|
`
|
|
|
|
type ListActiveMessagesSinceParams struct {
|
|
BotID pgtype.UUID `json:"bot_id"`
|
|
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
|
}
|
|
|
|
type ListActiveMessagesSinceRow struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
BotID pgtype.UUID `json:"bot_id"`
|
|
SessionID pgtype.UUID `json:"session_id"`
|
|
SenderChannelIdentityID pgtype.UUID `json:"sender_channel_identity_id"`
|
|
SenderUserID pgtype.UUID `json:"sender_user_id"`
|
|
ExternalMessageID pgtype.Text `json:"external_message_id"`
|
|
SourceReplyToMessageID pgtype.Text `json:"source_reply_to_message_id"`
|
|
Role string `json:"role"`
|
|
Content []byte `json:"content"`
|
|
Metadata []byte `json:"metadata"`
|
|
Usage []byte `json:"usage"`
|
|
EventID pgtype.UUID `json:"event_id"`
|
|
DisplayText pgtype.Text `json:"display_text"`
|
|
CompactID pgtype.UUID `json:"compact_id"`
|
|
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
|
SenderDisplayName pgtype.Text `json:"sender_display_name"`
|
|
SenderAvatarUrl pgtype.Text `json:"sender_avatar_url"`
|
|
Platform pgtype.Text `json:"platform"`
|
|
}
|
|
|
|
func (q *Queries) ListActiveMessagesSince(ctx context.Context, arg ListActiveMessagesSinceParams) ([]ListActiveMessagesSinceRow, error) {
|
|
rows, err := q.db.Query(ctx, listActiveMessagesSince, arg.BotID, arg.CreatedAt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []ListActiveMessagesSinceRow
|
|
for rows.Next() {
|
|
var i ListActiveMessagesSinceRow
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.BotID,
|
|
&i.SessionID,
|
|
&i.SenderChannelIdentityID,
|
|
&i.SenderUserID,
|
|
&i.ExternalMessageID,
|
|
&i.SourceReplyToMessageID,
|
|
&i.Role,
|
|
&i.Content,
|
|
&i.Metadata,
|
|
&i.Usage,
|
|
&i.EventID,
|
|
&i.DisplayText,
|
|
&i.CompactID,
|
|
&i.CreatedAt,
|
|
&i.SenderDisplayName,
|
|
&i.SenderAvatarUrl,
|
|
&i.Platform,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listActiveMessagesSinceBySession = `-- name: ListActiveMessagesSinceBySession :many
|
|
SELECT
|
|
m.id,
|
|
m.bot_id,
|
|
m.session_id,
|
|
m.sender_channel_identity_id,
|
|
m.sender_account_user_id AS sender_user_id,
|
|
m.source_message_id AS external_message_id,
|
|
m.source_reply_to_message_id,
|
|
m.role,
|
|
m.content,
|
|
m.metadata,
|
|
m.usage,
|
|
m.event_id,
|
|
m.display_text,
|
|
m.compact_id,
|
|
m.created_at,
|
|
ci.display_name AS sender_display_name,
|
|
ci.avatar_url AS sender_avatar_url,
|
|
s.channel_type AS platform
|
|
FROM bot_history_messages m
|
|
LEFT JOIN channel_identities ci ON ci.id = m.sender_channel_identity_id
|
|
LEFT JOIN bot_sessions s ON s.id = m.session_id
|
|
WHERE m.session_id = $1
|
|
AND m.created_at >= $2
|
|
AND (m.metadata->>'trigger_mode' IS NULL OR m.metadata->>'trigger_mode' != 'passive_sync')
|
|
ORDER BY m.created_at ASC
|
|
`
|
|
|
|
type ListActiveMessagesSinceBySessionParams struct {
|
|
SessionID pgtype.UUID `json:"session_id"`
|
|
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
|
}
|
|
|
|
type ListActiveMessagesSinceBySessionRow struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
BotID pgtype.UUID `json:"bot_id"`
|
|
SessionID pgtype.UUID `json:"session_id"`
|
|
SenderChannelIdentityID pgtype.UUID `json:"sender_channel_identity_id"`
|
|
SenderUserID pgtype.UUID `json:"sender_user_id"`
|
|
ExternalMessageID pgtype.Text `json:"external_message_id"`
|
|
SourceReplyToMessageID pgtype.Text `json:"source_reply_to_message_id"`
|
|
Role string `json:"role"`
|
|
Content []byte `json:"content"`
|
|
Metadata []byte `json:"metadata"`
|
|
Usage []byte `json:"usage"`
|
|
EventID pgtype.UUID `json:"event_id"`
|
|
DisplayText pgtype.Text `json:"display_text"`
|
|
CompactID pgtype.UUID `json:"compact_id"`
|
|
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
|
SenderDisplayName pgtype.Text `json:"sender_display_name"`
|
|
SenderAvatarUrl pgtype.Text `json:"sender_avatar_url"`
|
|
Platform pgtype.Text `json:"platform"`
|
|
}
|
|
|
|
func (q *Queries) ListActiveMessagesSinceBySession(ctx context.Context, arg ListActiveMessagesSinceBySessionParams) ([]ListActiveMessagesSinceBySessionRow, error) {
|
|
rows, err := q.db.Query(ctx, listActiveMessagesSinceBySession, arg.SessionID, arg.CreatedAt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []ListActiveMessagesSinceBySessionRow
|
|
for rows.Next() {
|
|
var i ListActiveMessagesSinceBySessionRow
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.BotID,
|
|
&i.SessionID,
|
|
&i.SenderChannelIdentityID,
|
|
&i.SenderUserID,
|
|
&i.ExternalMessageID,
|
|
&i.SourceReplyToMessageID,
|
|
&i.Role,
|
|
&i.Content,
|
|
&i.Metadata,
|
|
&i.Usage,
|
|
&i.EventID,
|
|
&i.DisplayText,
|
|
&i.CompactID,
|
|
&i.CreatedAt,
|
|
&i.SenderDisplayName,
|
|
&i.SenderAvatarUrl,
|
|
&i.Platform,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listMessages = `-- name: ListMessages :many
|
|
SELECT
|
|
m.id,
|
|
m.bot_id,
|
|
m.session_id,
|
|
m.sender_channel_identity_id,
|
|
m.sender_account_user_id AS sender_user_id,
|
|
m.source_message_id AS external_message_id,
|
|
m.source_reply_to_message_id,
|
|
m.role,
|
|
m.content,
|
|
m.metadata,
|
|
m.usage,
|
|
m.event_id,
|
|
m.display_text,
|
|
m.created_at,
|
|
ci.display_name AS sender_display_name,
|
|
ci.avatar_url AS sender_avatar_url,
|
|
s.channel_type AS platform
|
|
FROM bot_history_messages m
|
|
LEFT JOIN channel_identities ci ON ci.id = m.sender_channel_identity_id
|
|
LEFT JOIN bot_sessions s ON s.id = m.session_id
|
|
WHERE m.bot_id = $1
|
|
ORDER BY m.created_at ASC
|
|
LIMIT 10000
|
|
`
|
|
|
|
type ListMessagesRow struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
BotID pgtype.UUID `json:"bot_id"`
|
|
SessionID pgtype.UUID `json:"session_id"`
|
|
SenderChannelIdentityID pgtype.UUID `json:"sender_channel_identity_id"`
|
|
SenderUserID pgtype.UUID `json:"sender_user_id"`
|
|
ExternalMessageID pgtype.Text `json:"external_message_id"`
|
|
SourceReplyToMessageID pgtype.Text `json:"source_reply_to_message_id"`
|
|
Role string `json:"role"`
|
|
Content []byte `json:"content"`
|
|
Metadata []byte `json:"metadata"`
|
|
Usage []byte `json:"usage"`
|
|
EventID pgtype.UUID `json:"event_id"`
|
|
DisplayText pgtype.Text `json:"display_text"`
|
|
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
|
SenderDisplayName pgtype.Text `json:"sender_display_name"`
|
|
SenderAvatarUrl pgtype.Text `json:"sender_avatar_url"`
|
|
Platform pgtype.Text `json:"platform"`
|
|
}
|
|
|
|
func (q *Queries) ListMessages(ctx context.Context, botID pgtype.UUID) ([]ListMessagesRow, error) {
|
|
rows, err := q.db.Query(ctx, listMessages, botID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []ListMessagesRow
|
|
for rows.Next() {
|
|
var i ListMessagesRow
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.BotID,
|
|
&i.SessionID,
|
|
&i.SenderChannelIdentityID,
|
|
&i.SenderUserID,
|
|
&i.ExternalMessageID,
|
|
&i.SourceReplyToMessageID,
|
|
&i.Role,
|
|
&i.Content,
|
|
&i.Metadata,
|
|
&i.Usage,
|
|
&i.EventID,
|
|
&i.DisplayText,
|
|
&i.CreatedAt,
|
|
&i.SenderDisplayName,
|
|
&i.SenderAvatarUrl,
|
|
&i.Platform,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listMessagesBefore = `-- name: ListMessagesBefore :many
|
|
SELECT
|
|
m.id,
|
|
m.bot_id,
|
|
m.session_id,
|
|
m.sender_channel_identity_id,
|
|
m.sender_account_user_id AS sender_user_id,
|
|
m.source_message_id AS external_message_id,
|
|
m.source_reply_to_message_id,
|
|
m.role,
|
|
m.content,
|
|
m.metadata,
|
|
m.usage,
|
|
m.event_id,
|
|
m.display_text,
|
|
m.created_at,
|
|
ci.display_name AS sender_display_name,
|
|
ci.avatar_url AS sender_avatar_url,
|
|
s.channel_type AS platform
|
|
FROM bot_history_messages m
|
|
LEFT JOIN channel_identities ci ON ci.id = m.sender_channel_identity_id
|
|
LEFT JOIN bot_sessions s ON s.id = m.session_id
|
|
WHERE m.bot_id = $1
|
|
AND m.created_at < $2
|
|
ORDER BY m.created_at DESC
|
|
LIMIT $3
|
|
`
|
|
|
|
type ListMessagesBeforeParams struct {
|
|
BotID pgtype.UUID `json:"bot_id"`
|
|
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
|
MaxCount int32 `json:"max_count"`
|
|
}
|
|
|
|
type ListMessagesBeforeRow struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
BotID pgtype.UUID `json:"bot_id"`
|
|
SessionID pgtype.UUID `json:"session_id"`
|
|
SenderChannelIdentityID pgtype.UUID `json:"sender_channel_identity_id"`
|
|
SenderUserID pgtype.UUID `json:"sender_user_id"`
|
|
ExternalMessageID pgtype.Text `json:"external_message_id"`
|
|
SourceReplyToMessageID pgtype.Text `json:"source_reply_to_message_id"`
|
|
Role string `json:"role"`
|
|
Content []byte `json:"content"`
|
|
Metadata []byte `json:"metadata"`
|
|
Usage []byte `json:"usage"`
|
|
EventID pgtype.UUID `json:"event_id"`
|
|
DisplayText pgtype.Text `json:"display_text"`
|
|
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
|
SenderDisplayName pgtype.Text `json:"sender_display_name"`
|
|
SenderAvatarUrl pgtype.Text `json:"sender_avatar_url"`
|
|
Platform pgtype.Text `json:"platform"`
|
|
}
|
|
|
|
func (q *Queries) ListMessagesBefore(ctx context.Context, arg ListMessagesBeforeParams) ([]ListMessagesBeforeRow, error) {
|
|
rows, err := q.db.Query(ctx, listMessagesBefore, arg.BotID, arg.CreatedAt, arg.MaxCount)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []ListMessagesBeforeRow
|
|
for rows.Next() {
|
|
var i ListMessagesBeforeRow
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.BotID,
|
|
&i.SessionID,
|
|
&i.SenderChannelIdentityID,
|
|
&i.SenderUserID,
|
|
&i.ExternalMessageID,
|
|
&i.SourceReplyToMessageID,
|
|
&i.Role,
|
|
&i.Content,
|
|
&i.Metadata,
|
|
&i.Usage,
|
|
&i.EventID,
|
|
&i.DisplayText,
|
|
&i.CreatedAt,
|
|
&i.SenderDisplayName,
|
|
&i.SenderAvatarUrl,
|
|
&i.Platform,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listMessagesBeforeBySession = `-- name: ListMessagesBeforeBySession :many
|
|
SELECT
|
|
m.id,
|
|
m.bot_id,
|
|
m.session_id,
|
|
m.sender_channel_identity_id,
|
|
m.sender_account_user_id AS sender_user_id,
|
|
m.source_message_id AS external_message_id,
|
|
m.source_reply_to_message_id,
|
|
m.role,
|
|
m.content,
|
|
m.metadata,
|
|
m.usage,
|
|
m.event_id,
|
|
m.display_text,
|
|
m.created_at,
|
|
ci.display_name AS sender_display_name,
|
|
ci.avatar_url AS sender_avatar_url,
|
|
s.channel_type AS platform
|
|
FROM bot_history_messages m
|
|
LEFT JOIN channel_identities ci ON ci.id = m.sender_channel_identity_id
|
|
LEFT JOIN bot_sessions s ON s.id = m.session_id
|
|
WHERE m.session_id = $1
|
|
AND m.created_at < $2
|
|
ORDER BY m.created_at DESC
|
|
LIMIT $3
|
|
`
|
|
|
|
type ListMessagesBeforeBySessionParams struct {
|
|
SessionID pgtype.UUID `json:"session_id"`
|
|
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
|
MaxCount int32 `json:"max_count"`
|
|
}
|
|
|
|
type ListMessagesBeforeBySessionRow struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
BotID pgtype.UUID `json:"bot_id"`
|
|
SessionID pgtype.UUID `json:"session_id"`
|
|
SenderChannelIdentityID pgtype.UUID `json:"sender_channel_identity_id"`
|
|
SenderUserID pgtype.UUID `json:"sender_user_id"`
|
|
ExternalMessageID pgtype.Text `json:"external_message_id"`
|
|
SourceReplyToMessageID pgtype.Text `json:"source_reply_to_message_id"`
|
|
Role string `json:"role"`
|
|
Content []byte `json:"content"`
|
|
Metadata []byte `json:"metadata"`
|
|
Usage []byte `json:"usage"`
|
|
EventID pgtype.UUID `json:"event_id"`
|
|
DisplayText pgtype.Text `json:"display_text"`
|
|
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
|
SenderDisplayName pgtype.Text `json:"sender_display_name"`
|
|
SenderAvatarUrl pgtype.Text `json:"sender_avatar_url"`
|
|
Platform pgtype.Text `json:"platform"`
|
|
}
|
|
|
|
func (q *Queries) ListMessagesBeforeBySession(ctx context.Context, arg ListMessagesBeforeBySessionParams) ([]ListMessagesBeforeBySessionRow, error) {
|
|
rows, err := q.db.Query(ctx, listMessagesBeforeBySession, arg.SessionID, arg.CreatedAt, arg.MaxCount)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []ListMessagesBeforeBySessionRow
|
|
for rows.Next() {
|
|
var i ListMessagesBeforeBySessionRow
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.BotID,
|
|
&i.SessionID,
|
|
&i.SenderChannelIdentityID,
|
|
&i.SenderUserID,
|
|
&i.ExternalMessageID,
|
|
&i.SourceReplyToMessageID,
|
|
&i.Role,
|
|
&i.Content,
|
|
&i.Metadata,
|
|
&i.Usage,
|
|
&i.EventID,
|
|
&i.DisplayText,
|
|
&i.CreatedAt,
|
|
&i.SenderDisplayName,
|
|
&i.SenderAvatarUrl,
|
|
&i.Platform,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listMessagesBySession = `-- name: ListMessagesBySession :many
|
|
SELECT
|
|
m.id,
|
|
m.bot_id,
|
|
m.session_id,
|
|
m.sender_channel_identity_id,
|
|
m.sender_account_user_id AS sender_user_id,
|
|
m.source_message_id AS external_message_id,
|
|
m.source_reply_to_message_id,
|
|
m.role,
|
|
m.content,
|
|
m.metadata,
|
|
m.usage,
|
|
m.event_id,
|
|
m.display_text,
|
|
m.created_at,
|
|
ci.display_name AS sender_display_name,
|
|
ci.avatar_url AS sender_avatar_url,
|
|
s.channel_type AS platform
|
|
FROM bot_history_messages m
|
|
LEFT JOIN channel_identities ci ON ci.id = m.sender_channel_identity_id
|
|
LEFT JOIN bot_sessions s ON s.id = m.session_id
|
|
WHERE m.session_id = $1
|
|
ORDER BY m.created_at ASC
|
|
LIMIT 10000
|
|
`
|
|
|
|
type ListMessagesBySessionRow struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
BotID pgtype.UUID `json:"bot_id"`
|
|
SessionID pgtype.UUID `json:"session_id"`
|
|
SenderChannelIdentityID pgtype.UUID `json:"sender_channel_identity_id"`
|
|
SenderUserID pgtype.UUID `json:"sender_user_id"`
|
|
ExternalMessageID pgtype.Text `json:"external_message_id"`
|
|
SourceReplyToMessageID pgtype.Text `json:"source_reply_to_message_id"`
|
|
Role string `json:"role"`
|
|
Content []byte `json:"content"`
|
|
Metadata []byte `json:"metadata"`
|
|
Usage []byte `json:"usage"`
|
|
EventID pgtype.UUID `json:"event_id"`
|
|
DisplayText pgtype.Text `json:"display_text"`
|
|
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
|
SenderDisplayName pgtype.Text `json:"sender_display_name"`
|
|
SenderAvatarUrl pgtype.Text `json:"sender_avatar_url"`
|
|
Platform pgtype.Text `json:"platform"`
|
|
}
|
|
|
|
func (q *Queries) ListMessagesBySession(ctx context.Context, sessionID pgtype.UUID) ([]ListMessagesBySessionRow, error) {
|
|
rows, err := q.db.Query(ctx, listMessagesBySession, sessionID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []ListMessagesBySessionRow
|
|
for rows.Next() {
|
|
var i ListMessagesBySessionRow
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.BotID,
|
|
&i.SessionID,
|
|
&i.SenderChannelIdentityID,
|
|
&i.SenderUserID,
|
|
&i.ExternalMessageID,
|
|
&i.SourceReplyToMessageID,
|
|
&i.Role,
|
|
&i.Content,
|
|
&i.Metadata,
|
|
&i.Usage,
|
|
&i.EventID,
|
|
&i.DisplayText,
|
|
&i.CreatedAt,
|
|
&i.SenderDisplayName,
|
|
&i.SenderAvatarUrl,
|
|
&i.Platform,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listMessagesLatest = `-- name: ListMessagesLatest :many
|
|
SELECT
|
|
m.id,
|
|
m.bot_id,
|
|
m.session_id,
|
|
m.sender_channel_identity_id,
|
|
m.sender_account_user_id AS sender_user_id,
|
|
m.source_message_id AS external_message_id,
|
|
m.source_reply_to_message_id,
|
|
m.role,
|
|
m.content,
|
|
m.metadata,
|
|
m.usage,
|
|
m.event_id,
|
|
m.display_text,
|
|
m.created_at,
|
|
ci.display_name AS sender_display_name,
|
|
ci.avatar_url AS sender_avatar_url,
|
|
s.channel_type AS platform
|
|
FROM bot_history_messages m
|
|
LEFT JOIN channel_identities ci ON ci.id = m.sender_channel_identity_id
|
|
LEFT JOIN bot_sessions s ON s.id = m.session_id
|
|
WHERE m.bot_id = $1
|
|
ORDER BY m.created_at DESC
|
|
LIMIT $2
|
|
`
|
|
|
|
type ListMessagesLatestParams struct {
|
|
BotID pgtype.UUID `json:"bot_id"`
|
|
MaxCount int32 `json:"max_count"`
|
|
}
|
|
|
|
type ListMessagesLatestRow struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
BotID pgtype.UUID `json:"bot_id"`
|
|
SessionID pgtype.UUID `json:"session_id"`
|
|
SenderChannelIdentityID pgtype.UUID `json:"sender_channel_identity_id"`
|
|
SenderUserID pgtype.UUID `json:"sender_user_id"`
|
|
ExternalMessageID pgtype.Text `json:"external_message_id"`
|
|
SourceReplyToMessageID pgtype.Text `json:"source_reply_to_message_id"`
|
|
Role string `json:"role"`
|
|
Content []byte `json:"content"`
|
|
Metadata []byte `json:"metadata"`
|
|
Usage []byte `json:"usage"`
|
|
EventID pgtype.UUID `json:"event_id"`
|
|
DisplayText pgtype.Text `json:"display_text"`
|
|
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
|
SenderDisplayName pgtype.Text `json:"sender_display_name"`
|
|
SenderAvatarUrl pgtype.Text `json:"sender_avatar_url"`
|
|
Platform pgtype.Text `json:"platform"`
|
|
}
|
|
|
|
func (q *Queries) ListMessagesLatest(ctx context.Context, arg ListMessagesLatestParams) ([]ListMessagesLatestRow, error) {
|
|
rows, err := q.db.Query(ctx, listMessagesLatest, arg.BotID, arg.MaxCount)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []ListMessagesLatestRow
|
|
for rows.Next() {
|
|
var i ListMessagesLatestRow
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.BotID,
|
|
&i.SessionID,
|
|
&i.SenderChannelIdentityID,
|
|
&i.SenderUserID,
|
|
&i.ExternalMessageID,
|
|
&i.SourceReplyToMessageID,
|
|
&i.Role,
|
|
&i.Content,
|
|
&i.Metadata,
|
|
&i.Usage,
|
|
&i.EventID,
|
|
&i.DisplayText,
|
|
&i.CreatedAt,
|
|
&i.SenderDisplayName,
|
|
&i.SenderAvatarUrl,
|
|
&i.Platform,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listMessagesLatestBySession = `-- name: ListMessagesLatestBySession :many
|
|
SELECT
|
|
m.id,
|
|
m.bot_id,
|
|
m.session_id,
|
|
m.sender_channel_identity_id,
|
|
m.sender_account_user_id AS sender_user_id,
|
|
m.source_message_id AS external_message_id,
|
|
m.source_reply_to_message_id,
|
|
m.role,
|
|
m.content,
|
|
m.metadata,
|
|
m.usage,
|
|
m.event_id,
|
|
m.display_text,
|
|
m.created_at,
|
|
ci.display_name AS sender_display_name,
|
|
ci.avatar_url AS sender_avatar_url,
|
|
s.channel_type AS platform
|
|
FROM bot_history_messages m
|
|
LEFT JOIN channel_identities ci ON ci.id = m.sender_channel_identity_id
|
|
LEFT JOIN bot_sessions s ON s.id = m.session_id
|
|
WHERE m.session_id = $1
|
|
ORDER BY m.created_at DESC
|
|
LIMIT $2
|
|
`
|
|
|
|
type ListMessagesLatestBySessionParams struct {
|
|
SessionID pgtype.UUID `json:"session_id"`
|
|
MaxCount int32 `json:"max_count"`
|
|
}
|
|
|
|
type ListMessagesLatestBySessionRow struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
BotID pgtype.UUID `json:"bot_id"`
|
|
SessionID pgtype.UUID `json:"session_id"`
|
|
SenderChannelIdentityID pgtype.UUID `json:"sender_channel_identity_id"`
|
|
SenderUserID pgtype.UUID `json:"sender_user_id"`
|
|
ExternalMessageID pgtype.Text `json:"external_message_id"`
|
|
SourceReplyToMessageID pgtype.Text `json:"source_reply_to_message_id"`
|
|
Role string `json:"role"`
|
|
Content []byte `json:"content"`
|
|
Metadata []byte `json:"metadata"`
|
|
Usage []byte `json:"usage"`
|
|
EventID pgtype.UUID `json:"event_id"`
|
|
DisplayText pgtype.Text `json:"display_text"`
|
|
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
|
SenderDisplayName pgtype.Text `json:"sender_display_name"`
|
|
SenderAvatarUrl pgtype.Text `json:"sender_avatar_url"`
|
|
Platform pgtype.Text `json:"platform"`
|
|
}
|
|
|
|
func (q *Queries) ListMessagesLatestBySession(ctx context.Context, arg ListMessagesLatestBySessionParams) ([]ListMessagesLatestBySessionRow, error) {
|
|
rows, err := q.db.Query(ctx, listMessagesLatestBySession, arg.SessionID, arg.MaxCount)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []ListMessagesLatestBySessionRow
|
|
for rows.Next() {
|
|
var i ListMessagesLatestBySessionRow
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.BotID,
|
|
&i.SessionID,
|
|
&i.SenderChannelIdentityID,
|
|
&i.SenderUserID,
|
|
&i.ExternalMessageID,
|
|
&i.SourceReplyToMessageID,
|
|
&i.Role,
|
|
&i.Content,
|
|
&i.Metadata,
|
|
&i.Usage,
|
|
&i.EventID,
|
|
&i.DisplayText,
|
|
&i.CreatedAt,
|
|
&i.SenderDisplayName,
|
|
&i.SenderAvatarUrl,
|
|
&i.Platform,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listMessagesSince = `-- name: ListMessagesSince :many
|
|
SELECT
|
|
m.id,
|
|
m.bot_id,
|
|
m.session_id,
|
|
m.sender_channel_identity_id,
|
|
m.sender_account_user_id AS sender_user_id,
|
|
m.source_message_id AS external_message_id,
|
|
m.source_reply_to_message_id,
|
|
m.role,
|
|
m.content,
|
|
m.metadata,
|
|
m.usage,
|
|
m.event_id,
|
|
m.display_text,
|
|
m.created_at,
|
|
ci.display_name AS sender_display_name,
|
|
ci.avatar_url AS sender_avatar_url,
|
|
s.channel_type AS platform
|
|
FROM bot_history_messages m
|
|
LEFT JOIN channel_identities ci ON ci.id = m.sender_channel_identity_id
|
|
LEFT JOIN bot_sessions s ON s.id = m.session_id
|
|
WHERE m.bot_id = $1
|
|
AND m.created_at >= $2
|
|
ORDER BY m.created_at ASC
|
|
`
|
|
|
|
type ListMessagesSinceParams struct {
|
|
BotID pgtype.UUID `json:"bot_id"`
|
|
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
|
}
|
|
|
|
type ListMessagesSinceRow struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
BotID pgtype.UUID `json:"bot_id"`
|
|
SessionID pgtype.UUID `json:"session_id"`
|
|
SenderChannelIdentityID pgtype.UUID `json:"sender_channel_identity_id"`
|
|
SenderUserID pgtype.UUID `json:"sender_user_id"`
|
|
ExternalMessageID pgtype.Text `json:"external_message_id"`
|
|
SourceReplyToMessageID pgtype.Text `json:"source_reply_to_message_id"`
|
|
Role string `json:"role"`
|
|
Content []byte `json:"content"`
|
|
Metadata []byte `json:"metadata"`
|
|
Usage []byte `json:"usage"`
|
|
EventID pgtype.UUID `json:"event_id"`
|
|
DisplayText pgtype.Text `json:"display_text"`
|
|
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
|
SenderDisplayName pgtype.Text `json:"sender_display_name"`
|
|
SenderAvatarUrl pgtype.Text `json:"sender_avatar_url"`
|
|
Platform pgtype.Text `json:"platform"`
|
|
}
|
|
|
|
func (q *Queries) ListMessagesSince(ctx context.Context, arg ListMessagesSinceParams) ([]ListMessagesSinceRow, error) {
|
|
rows, err := q.db.Query(ctx, listMessagesSince, arg.BotID, arg.CreatedAt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []ListMessagesSinceRow
|
|
for rows.Next() {
|
|
var i ListMessagesSinceRow
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.BotID,
|
|
&i.SessionID,
|
|
&i.SenderChannelIdentityID,
|
|
&i.SenderUserID,
|
|
&i.ExternalMessageID,
|
|
&i.SourceReplyToMessageID,
|
|
&i.Role,
|
|
&i.Content,
|
|
&i.Metadata,
|
|
&i.Usage,
|
|
&i.EventID,
|
|
&i.DisplayText,
|
|
&i.CreatedAt,
|
|
&i.SenderDisplayName,
|
|
&i.SenderAvatarUrl,
|
|
&i.Platform,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listMessagesSinceBySession = `-- name: ListMessagesSinceBySession :many
|
|
SELECT
|
|
m.id,
|
|
m.bot_id,
|
|
m.session_id,
|
|
m.sender_channel_identity_id,
|
|
m.sender_account_user_id AS sender_user_id,
|
|
m.source_message_id AS external_message_id,
|
|
m.source_reply_to_message_id,
|
|
m.role,
|
|
m.content,
|
|
m.metadata,
|
|
m.usage,
|
|
m.event_id,
|
|
m.display_text,
|
|
m.created_at,
|
|
ci.display_name AS sender_display_name,
|
|
ci.avatar_url AS sender_avatar_url,
|
|
s.channel_type AS platform
|
|
FROM bot_history_messages m
|
|
LEFT JOIN channel_identities ci ON ci.id = m.sender_channel_identity_id
|
|
LEFT JOIN bot_sessions s ON s.id = m.session_id
|
|
WHERE m.session_id = $1
|
|
AND m.created_at >= $2
|
|
ORDER BY m.created_at ASC
|
|
`
|
|
|
|
type ListMessagesSinceBySessionParams struct {
|
|
SessionID pgtype.UUID `json:"session_id"`
|
|
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
|
}
|
|
|
|
type ListMessagesSinceBySessionRow struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
BotID pgtype.UUID `json:"bot_id"`
|
|
SessionID pgtype.UUID `json:"session_id"`
|
|
SenderChannelIdentityID pgtype.UUID `json:"sender_channel_identity_id"`
|
|
SenderUserID pgtype.UUID `json:"sender_user_id"`
|
|
ExternalMessageID pgtype.Text `json:"external_message_id"`
|
|
SourceReplyToMessageID pgtype.Text `json:"source_reply_to_message_id"`
|
|
Role string `json:"role"`
|
|
Content []byte `json:"content"`
|
|
Metadata []byte `json:"metadata"`
|
|
Usage []byte `json:"usage"`
|
|
EventID pgtype.UUID `json:"event_id"`
|
|
DisplayText pgtype.Text `json:"display_text"`
|
|
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
|
SenderDisplayName pgtype.Text `json:"sender_display_name"`
|
|
SenderAvatarUrl pgtype.Text `json:"sender_avatar_url"`
|
|
Platform pgtype.Text `json:"platform"`
|
|
}
|
|
|
|
func (q *Queries) ListMessagesSinceBySession(ctx context.Context, arg ListMessagesSinceBySessionParams) ([]ListMessagesSinceBySessionRow, error) {
|
|
rows, err := q.db.Query(ctx, listMessagesSinceBySession, arg.SessionID, arg.CreatedAt)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []ListMessagesSinceBySessionRow
|
|
for rows.Next() {
|
|
var i ListMessagesSinceBySessionRow
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.BotID,
|
|
&i.SessionID,
|
|
&i.SenderChannelIdentityID,
|
|
&i.SenderUserID,
|
|
&i.ExternalMessageID,
|
|
&i.SourceReplyToMessageID,
|
|
&i.Role,
|
|
&i.Content,
|
|
&i.Metadata,
|
|
&i.Usage,
|
|
&i.EventID,
|
|
&i.DisplayText,
|
|
&i.CreatedAt,
|
|
&i.SenderDisplayName,
|
|
&i.SenderAvatarUrl,
|
|
&i.Platform,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listObservedConversationsByChannelIdentity = `-- name: ListObservedConversationsByChannelIdentity :many
|
|
WITH observed_routes AS (
|
|
SELECT
|
|
s.route_id,
|
|
MAX(m.created_at)::timestamptz AS last_observed_at
|
|
FROM bot_history_messages m
|
|
JOIN bot_sessions s ON s.id = m.session_id
|
|
WHERE m.bot_id = $1
|
|
AND m.sender_channel_identity_id = $2::uuid
|
|
AND s.route_id IS NOT NULL
|
|
GROUP BY s.route_id
|
|
)
|
|
SELECT
|
|
r.id AS route_id,
|
|
r.channel_type AS channel,
|
|
CASE
|
|
WHEN LOWER(COALESCE(r.conversation_type, '')) IN ('thread', 'topic') THEN 'thread'
|
|
WHEN LOWER(COALESCE(r.conversation_type, '')) IN ('p2p', 'private', 'direct', 'dm') THEN 'private'
|
|
ELSE 'group'
|
|
END AS conversation_type,
|
|
r.external_conversation_id AS conversation_id,
|
|
COALESCE(r.external_thread_id, '') AS thread_id,
|
|
COALESCE(
|
|
NULLIF(TRIM(COALESCE(r.metadata->>'conversation_name', '')), ''),
|
|
NULLIF(TRIM(COALESCE(r.metadata->>'conversation_handle', '')), ''),
|
|
''
|
|
)::text AS conversation_name,
|
|
rr.last_observed_at
|
|
FROM observed_routes rr
|
|
JOIN bot_channel_routes r ON r.id = rr.route_id
|
|
GROUP BY
|
|
r.id,
|
|
r.channel_type,
|
|
r.conversation_type,
|
|
r.external_conversation_id,
|
|
r.external_thread_id,
|
|
r.metadata,
|
|
rr.last_observed_at
|
|
ORDER BY rr.last_observed_at DESC
|
|
`
|
|
|
|
type ListObservedConversationsByChannelIdentityParams struct {
|
|
BotID pgtype.UUID `json:"bot_id"`
|
|
ChannelIdentityID pgtype.UUID `json:"channel_identity_id"`
|
|
}
|
|
|
|
type ListObservedConversationsByChannelIdentityRow struct {
|
|
RouteID pgtype.UUID `json:"route_id"`
|
|
Channel string `json:"channel"`
|
|
ConversationType string `json:"conversation_type"`
|
|
ConversationID string `json:"conversation_id"`
|
|
ThreadID string `json:"thread_id"`
|
|
ConversationName string `json:"conversation_name"`
|
|
LastObservedAt pgtype.Timestamptz `json:"last_observed_at"`
|
|
}
|
|
|
|
func (q *Queries) ListObservedConversationsByChannelIdentity(ctx context.Context, arg ListObservedConversationsByChannelIdentityParams) ([]ListObservedConversationsByChannelIdentityRow, error) {
|
|
rows, err := q.db.Query(ctx, listObservedConversationsByChannelIdentity, arg.BotID, arg.ChannelIdentityID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []ListObservedConversationsByChannelIdentityRow
|
|
for rows.Next() {
|
|
var i ListObservedConversationsByChannelIdentityRow
|
|
if err := rows.Scan(
|
|
&i.RouteID,
|
|
&i.Channel,
|
|
&i.ConversationType,
|
|
&i.ConversationID,
|
|
&i.ThreadID,
|
|
&i.ConversationName,
|
|
&i.LastObservedAt,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listObservedConversationsByChannelType = `-- name: ListObservedConversationsByChannelType :many
|
|
WITH observed_routes AS (
|
|
SELECT
|
|
s.route_id,
|
|
MAX(m.created_at)::timestamptz AS last_observed_at
|
|
FROM bot_history_messages m
|
|
JOIN bot_sessions s ON s.id = m.session_id
|
|
JOIN bot_channel_routes r ON r.id = s.route_id
|
|
WHERE m.bot_id = $1
|
|
AND LOWER(TRIM(r.channel_type)) = LOWER(TRIM($2))
|
|
AND s.route_id IS NOT NULL
|
|
GROUP BY s.route_id
|
|
)
|
|
SELECT
|
|
r.id AS route_id,
|
|
r.channel_type AS channel,
|
|
CASE
|
|
WHEN LOWER(COALESCE(r.conversation_type, '')) IN ('thread', 'topic') THEN 'thread'
|
|
WHEN LOWER(COALESCE(r.conversation_type, '')) IN ('p2p', 'private', 'direct', 'dm') THEN 'private'
|
|
ELSE 'group'
|
|
END AS conversation_type,
|
|
r.external_conversation_id AS conversation_id,
|
|
COALESCE(r.external_thread_id, '') AS thread_id,
|
|
COALESCE(
|
|
NULLIF(TRIM(COALESCE(r.metadata->>'conversation_name', '')), ''),
|
|
NULLIF(TRIM(COALESCE(r.metadata->>'conversation_handle', '')), ''),
|
|
''
|
|
)::text AS conversation_name,
|
|
rr.last_observed_at
|
|
FROM observed_routes rr
|
|
JOIN bot_channel_routes r ON r.id = rr.route_id
|
|
GROUP BY
|
|
r.id,
|
|
r.channel_type,
|
|
r.conversation_type,
|
|
r.external_conversation_id,
|
|
r.external_thread_id,
|
|
r.metadata,
|
|
rr.last_observed_at
|
|
ORDER BY rr.last_observed_at DESC
|
|
`
|
|
|
|
type ListObservedConversationsByChannelTypeParams struct {
|
|
BotID pgtype.UUID `json:"bot_id"`
|
|
ChannelType string `json:"channel_type"`
|
|
}
|
|
|
|
type ListObservedConversationsByChannelTypeRow struct {
|
|
RouteID pgtype.UUID `json:"route_id"`
|
|
Channel string `json:"channel"`
|
|
ConversationType string `json:"conversation_type"`
|
|
ConversationID string `json:"conversation_id"`
|
|
ThreadID string `json:"thread_id"`
|
|
ConversationName string `json:"conversation_name"`
|
|
LastObservedAt pgtype.Timestamptz `json:"last_observed_at"`
|
|
}
|
|
|
|
// Routes on this platform type where the bot has seen at least one message (any sender).
|
|
func (q *Queries) ListObservedConversationsByChannelType(ctx context.Context, arg ListObservedConversationsByChannelTypeParams) ([]ListObservedConversationsByChannelTypeRow, error) {
|
|
rows, err := q.db.Query(ctx, listObservedConversationsByChannelType, arg.BotID, arg.ChannelType)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []ListObservedConversationsByChannelTypeRow
|
|
for rows.Next() {
|
|
var i ListObservedConversationsByChannelTypeRow
|
|
if err := rows.Scan(
|
|
&i.RouteID,
|
|
&i.Channel,
|
|
&i.ConversationType,
|
|
&i.ConversationID,
|
|
&i.ThreadID,
|
|
&i.ConversationName,
|
|
&i.LastObservedAt,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const listUncompactedMessagesBySession = `-- name: ListUncompactedMessagesBySession :many
|
|
SELECT id, bot_id, session_id, role, content, usage, sender_channel_identity_id, compact_id, created_at
|
|
FROM bot_history_messages
|
|
WHERE session_id = $1
|
|
AND compact_id IS NULL
|
|
ORDER BY created_at ASC
|
|
`
|
|
|
|
type ListUncompactedMessagesBySessionRow struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
BotID pgtype.UUID `json:"bot_id"`
|
|
SessionID pgtype.UUID `json:"session_id"`
|
|
Role string `json:"role"`
|
|
Content []byte `json:"content"`
|
|
Usage []byte `json:"usage"`
|
|
SenderChannelIdentityID pgtype.UUID `json:"sender_channel_identity_id"`
|
|
CompactID pgtype.UUID `json:"compact_id"`
|
|
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
|
}
|
|
|
|
func (q *Queries) ListUncompactedMessagesBySession(ctx context.Context, sessionID pgtype.UUID) ([]ListUncompactedMessagesBySessionRow, error) {
|
|
rows, err := q.db.Query(ctx, listUncompactedMessagesBySession, sessionID)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []ListUncompactedMessagesBySessionRow
|
|
for rows.Next() {
|
|
var i ListUncompactedMessagesBySessionRow
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.BotID,
|
|
&i.SessionID,
|
|
&i.Role,
|
|
&i.Content,
|
|
&i.Usage,
|
|
&i.SenderChannelIdentityID,
|
|
&i.CompactID,
|
|
&i.CreatedAt,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|
|
|
|
const markMessagesCompacted = `-- name: MarkMessagesCompacted :exec
|
|
UPDATE bot_history_messages
|
|
SET compact_id = $1
|
|
WHERE id = ANY($2::uuid[])
|
|
`
|
|
|
|
type MarkMessagesCompactedParams struct {
|
|
CompactID pgtype.UUID `json:"compact_id"`
|
|
Column2 []pgtype.UUID `json:"column_2"`
|
|
}
|
|
|
|
func (q *Queries) MarkMessagesCompacted(ctx context.Context, arg MarkMessagesCompactedParams) error {
|
|
_, err := q.db.Exec(ctx, markMessagesCompacted, arg.CompactID, arg.Column2)
|
|
return err
|
|
}
|
|
|
|
const searchMessages = `-- name: SearchMessages :many
|
|
SELECT
|
|
m.id,
|
|
m.bot_id,
|
|
m.session_id,
|
|
m.sender_channel_identity_id,
|
|
m.role,
|
|
m.content,
|
|
m.created_at,
|
|
ci.display_name AS sender_display_name,
|
|
s.channel_type AS platform
|
|
FROM bot_history_messages m
|
|
LEFT JOIN channel_identities ci ON ci.id = m.sender_channel_identity_id
|
|
LEFT JOIN bot_sessions s ON s.id = m.session_id
|
|
WHERE m.bot_id = $1
|
|
AND ($2::uuid IS NULL OR m.session_id = $2::uuid)
|
|
AND ($3::uuid IS NULL OR m.sender_channel_identity_id = $3::uuid)
|
|
AND ($4::timestamptz IS NULL OR m.created_at >= $4::timestamptz)
|
|
AND ($5::timestamptz IS NULL OR m.created_at <= $5::timestamptz)
|
|
AND ($6::text IS NULL OR m.role = $6::text)
|
|
AND ($7::text IS NULL OR (
|
|
CASE
|
|
WHEN jsonb_typeof(m.content->'content') = 'string'
|
|
THEN m.content->>'content'
|
|
WHEN jsonb_typeof(m.content->'content') = 'array'
|
|
THEN (SELECT COALESCE(string_agg(elem->>'text', ' '), '')
|
|
FROM jsonb_array_elements(m.content->'content') AS elem
|
|
WHERE elem->>'type' = 'text')
|
|
ELSE ''
|
|
END
|
|
) ILIKE '%' || $7::text || '%')
|
|
ORDER BY m.created_at DESC
|
|
LIMIT $8
|
|
`
|
|
|
|
type SearchMessagesParams struct {
|
|
BotID pgtype.UUID `json:"bot_id"`
|
|
SessionID pgtype.UUID `json:"session_id"`
|
|
ContactID pgtype.UUID `json:"contact_id"`
|
|
StartTime pgtype.Timestamptz `json:"start_time"`
|
|
EndTime pgtype.Timestamptz `json:"end_time"`
|
|
Role pgtype.Text `json:"role"`
|
|
Keyword pgtype.Text `json:"keyword"`
|
|
MaxCount int32 `json:"max_count"`
|
|
}
|
|
|
|
type SearchMessagesRow struct {
|
|
ID pgtype.UUID `json:"id"`
|
|
BotID pgtype.UUID `json:"bot_id"`
|
|
SessionID pgtype.UUID `json:"session_id"`
|
|
SenderChannelIdentityID pgtype.UUID `json:"sender_channel_identity_id"`
|
|
Role string `json:"role"`
|
|
Content []byte `json:"content"`
|
|
CreatedAt pgtype.Timestamptz `json:"created_at"`
|
|
SenderDisplayName pgtype.Text `json:"sender_display_name"`
|
|
Platform pgtype.Text `json:"platform"`
|
|
}
|
|
|
|
func (q *Queries) SearchMessages(ctx context.Context, arg SearchMessagesParams) ([]SearchMessagesRow, error) {
|
|
rows, err := q.db.Query(ctx, searchMessages,
|
|
arg.BotID,
|
|
arg.SessionID,
|
|
arg.ContactID,
|
|
arg.StartTime,
|
|
arg.EndTime,
|
|
arg.Role,
|
|
arg.Keyword,
|
|
arg.MaxCount,
|
|
)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
var items []SearchMessagesRow
|
|
for rows.Next() {
|
|
var i SearchMessagesRow
|
|
if err := rows.Scan(
|
|
&i.ID,
|
|
&i.BotID,
|
|
&i.SessionID,
|
|
&i.SenderChannelIdentityID,
|
|
&i.Role,
|
|
&i.Content,
|
|
&i.CreatedAt,
|
|
&i.SenderDisplayName,
|
|
&i.Platform,
|
|
); err != nil {
|
|
return nil, err
|
|
}
|
|
items = append(items, i)
|
|
}
|
|
if err := rows.Err(); err != nil {
|
|
return nil, err
|
|
}
|
|
return items, nil
|
|
}
|