feat: heartbeat (#108)

* feat: heartbeat

* feat: independent heartbeat model
This commit is contained in:
Acbox Liu
2026-02-24 19:25:02 +08:00
committed by Ran
parent a9680587c6
commit 2f38662d4d
47 changed files with 2276 additions and 43 deletions
+136
View File
@@ -22,6 +22,7 @@ import (
"github.com/memohai/memoh/internal/conversation"
"github.com/memohai/memoh/internal/db"
"github.com/memohai/memoh/internal/db/sqlc"
"github.com/memohai/memoh/internal/heartbeat"
"github.com/memohai/memoh/internal/inbox"
"github.com/memohai/memoh/internal/memory"
messagepkg "github.com/memohai/memoh/internal/message"
@@ -196,6 +197,7 @@ type gatewayRequest struct {
type gatewayResponse struct {
Messages []conversation.ModelMessage `json:"messages"`
Skills []string `json:"skills"`
Text string `json:"text,omitempty"`
Usage json.RawMessage `json:"usage,omitempty"`
Usages []json.RawMessage `json:"usages,omitempty"`
}
@@ -240,6 +242,35 @@ func (t triggerScheduleRequest) MarshalJSON() ([]byte, error) {
return json.Marshal(m)
}
// gatewayHeartbeat matches the agent gateway HeartbeatModel for /chat/trigger-heartbeat.
type gatewayHeartbeat struct {
Interval int `json:"interval"`
}
// triggerHeartbeatRequest is the payload for POST /chat/trigger-heartbeat.
type triggerHeartbeatRequest struct {
gatewayRequest
Heartbeat gatewayHeartbeat `json:"heartbeat"`
}
// MarshalJSON marshals the request without the "query" field for trigger-heartbeat.
func (t triggerHeartbeatRequest) MarshalJSON() ([]byte, error) {
type alias struct {
gatewayRequest
Heartbeat gatewayHeartbeat `json:"heartbeat"`
}
raw, err := json.Marshal(alias(t))
if err != nil {
return nil, err
}
var m map[string]json.RawMessage
if err := json.Unmarshal(raw, &m); err != nil {
return nil, err
}
delete(m, "query")
return json.Marshal(m)
}
// --- resolved context (shared by Chat / StreamChat / TriggerSchedule) ---
type resolvedContext struct {
@@ -511,6 +542,75 @@ func (r *Resolver) TriggerSchedule(ctx context.Context, botID string, payload sc
return r.storeRound(ctx, req, resp.Messages, resp.Usage, resp.Usages)
}
// --- TriggerHeartbeat ---
// TriggerHeartbeat executes a heartbeat check through the agent gateway trigger-heartbeat endpoint.
func (r *Resolver) TriggerHeartbeat(ctx context.Context, botID string, payload heartbeat.TriggerPayload, token string) (heartbeat.TriggerResult, error) {
if strings.TrimSpace(botID) == "" {
return heartbeat.TriggerResult{}, fmt.Errorf("bot id is required")
}
// If a dedicated heartbeat model is configured, use it instead of the
// default chat model. We load the bot settings first so that we can
// set req.Model, which takes highest priority in selectChatModel.
var heartbeatModel string
if botSettings, err := r.loadBotSettings(ctx, botID); err == nil {
heartbeatModel = strings.TrimSpace(botSettings.HeartbeatModelID)
}
req := conversation.ChatRequest{
BotID: botID,
ChatID: botID,
Query: "heartbeat",
UserID: payload.OwnerUserID,
Token: token,
Model: heartbeatModel,
}
rc, err := r.resolve(ctx, req)
if err != nil {
return heartbeat.TriggerResult{}, err
}
hbPayload := rc.payload
hbPayload.Identity.ChannelIdentityID = strings.TrimSpace(payload.OwnerUserID)
hbPayload.Identity.DisplayName = "Heartbeat"
triggerReq := triggerHeartbeatRequest{
gatewayRequest: hbPayload,
Heartbeat: gatewayHeartbeat{
Interval: payload.Interval,
},
}
resp, err := r.postTriggerHeartbeat(ctx, triggerReq, token)
if err != nil {
return heartbeat.TriggerResult{}, err
}
status := "alert"
text := strings.TrimSpace(resp.Text)
if isHeartbeatOK(text) {
status = "ok"
}
var usageBytes []byte
if resp.Usage != nil {
usageBytes, _ = json.Marshal(resp.Usage)
}
return heartbeat.TriggerResult{
Status: status,
Text: text,
Usage: resp.Usage,
UsageBytes: usageBytes,
}, nil
}
func isHeartbeatOK(text string) bool {
t := strings.TrimSpace(text)
return strings.HasPrefix(t, "HEARTBEAT_OK") || strings.HasSuffix(t, "HEARTBEAT_OK") || t == "HEARTBEAT_OK"
}
// --- StreamChat ---
// StreamChat sends a streaming chat request to the agent gateway.
@@ -642,6 +742,42 @@ func (r *Resolver) postTriggerSchedule(ctx context.Context, payload triggerSched
return parsed, nil
}
// postTriggerHeartbeat sends a trigger-heartbeat request to the agent gateway.
func (r *Resolver) postTriggerHeartbeat(ctx context.Context, payload triggerHeartbeatRequest, token string) (gatewayResponse, error) {
url := r.gatewayBaseURL + "/chat/trigger-heartbeat"
r.logger.Info("gateway trigger-heartbeat request", slog.String("url", url))
httpReq, err := newJSONRequestWithContext(ctx, http.MethodPost, url, payload)
if err != nil {
return gatewayResponse{}, err
}
if strings.TrimSpace(token) != "" {
httpReq.Header.Set("Authorization", token)
}
resp, err := r.httpClient.Do(httpReq)
if err != nil {
return gatewayResponse{}, err
}
defer resp.Body.Close()
respBody, err := io.ReadAll(resp.Body)
if err != nil {
return gatewayResponse{}, err
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
r.logger.Error("gateway trigger-heartbeat error", slog.String("url", url), slog.Int("status", resp.StatusCode), slog.String("body_prefix", truncate(string(respBody), 300)))
return gatewayResponse{}, fmt.Errorf("agent gateway error: %s", strings.TrimSpace(string(respBody)))
}
var parsed gatewayResponse
if err := json.Unmarshal(respBody, &parsed); err != nil {
r.logger.Error("gateway trigger-heartbeat response parse failed", slog.String("body_prefix", truncate(string(respBody), 300)), slog.Any("error", err))
return gatewayResponse{}, fmt.Errorf("failed to parse gateway response: %w", err)
}
return parsed, nil
}
func (r *Resolver) streamChat(ctx context.Context, payload gatewayRequest, req conversation.ChatRequest, chunkCh chan<- conversation.StreamChunk) error {
url := r.gatewayBaseURL + "/chat/stream"
r.logger.Info(