added Tokens, openrouter, memory system
CI / check (push) Successful in 4m11s
CI / test (push) Successful in 3m57s
CI / clippy (push) Has been cancelled

This commit is contained in:
Sithies
2026-03-21 19:59:07 +01:00
parent 4e6b2c6759
commit 18b666f45d
41 changed files with 3217 additions and 258 deletions
+142
View File
@@ -0,0 +1,142 @@
// memory/src/conversation.rs
use anyhow::Result;
use chrono::Utc;
use sqlx::{SqlitePool, Row};
use crate::models::{Conversation, ConversationMessage};
pub struct ConversationStore<'a> {
pool: &'a SqlitePool,
agent_id: &'a str,
}
impl<'a> ConversationStore<'a> {
pub fn new(pool: &'a SqlitePool, agent_id: &'a str) -> Self {
Self { pool, agent_id }
}
pub async fn get_or_create(&self, timeout_mins: u64) -> Result<Conversation> {
let today = Utc::now().format("%Y-%m-%d").to_string();
let now = Utc::now().timestamp();
let cutoff = now - (timeout_mins * 60) as i64;
let row = sqlx::query(
"SELECT id, agent_id, date, summary, closed, created_at
FROM conversations
WHERE agent_id = ? AND closed = 0 AND date = ? AND created_at > ?
ORDER BY created_at DESC
LIMIT 1"
)
.bind(self.agent_id)
.bind(&today)
.bind(cutoff)
.fetch_optional(self.pool)
.await?;
if let Some(r) = row {
return Ok(Conversation {
id: r.get("id"),
agent_id: r.get("agent_id"),
date: r.get("date"),
summary: r.get("summary"),
closed: r.get::<i64, _>("closed") != 0,
created_at: chrono::DateTime::from_timestamp(r.get("created_at"), 0)
.unwrap_or_default(),
});
}
let id = sqlx::query(
"INSERT INTO conversations (agent_id, date, closed, created_at)
VALUES (?, ?, 0, ?)"
)
.bind(self.agent_id)
.bind(&today)
.bind(now)
.execute(self.pool)
.await?
.last_insert_rowid();
Ok(Conversation {
id,
agent_id: self.agent_id.to_string(),
date: today,
summary: None,
closed: false,
created_at: Utc::now(),
})
}
pub async fn save_message(
&self,
conversation_id: i64,
role: &str,
content: &str,
) -> Result<()> {
let now = Utc::now().timestamp();
sqlx::query(
"INSERT INTO messages (conversation_id, role, content, timestamp)
VALUES (?, ?, ?, ?)"
)
.bind(conversation_id)
.bind(role)
.bind(content)
.bind(now)
.execute(self.pool)
.await?;
Ok(())
}
pub async fn load_window(
&self,
conversation_id: i64,
window: usize,
) -> Result<Vec<ConversationMessage>> {
let rows = sqlx::query(
"SELECT id, conversation_id, role, content, timestamp
FROM messages
WHERE conversation_id = ?
ORDER BY timestamp DESC
LIMIT ?"
)
.bind(conversation_id)
.bind(window as i64)
.fetch_all(self.pool)
.await?;
let messages = rows.into_iter().rev().map(|r| ConversationMessage {
id: r.get("id"),
conversation_id: r.get("conversation_id"),
role: r.get("role"),
content: r.get("content"),
timestamp: chrono::DateTime::from_timestamp(r.get("timestamp"), 0)
.unwrap_or_default(),
}).collect();
Ok(messages)
}
pub async fn close(&self, conversation_id: i64, summary: Option<&str>) -> Result<()> {
sqlx::query(
"UPDATE conversations SET closed = 1, summary = ? WHERE id = ?"
)
.bind(summary)
.bind(conversation_id)
.execute(self.pool)
.await?;
Ok(())
}
pub async fn last_summary(&self) -> Result<Option<String>> {
let row = sqlx::query(
"SELECT summary FROM conversations
WHERE agent_id = ? AND closed = 1 AND summary IS NOT NULL
ORDER BY created_at DESC
LIMIT 1"
)
.bind(self.agent_id)
.fetch_optional(self.pool)
.await?;
Ok(row.map(|r| r.get("summary")))
}
}
+105
View File
@@ -0,0 +1,105 @@
// memory/src/facts.rs
use anyhow::Result;
use chrono::Utc;
use sqlx::{SqlitePool, Row};
use tracing::warn;
use crate::models::{Fact, CategorySummary};
pub const DEFAULT_CATEGORIES: &[&str] = &[
"persönlich",
"präferenzen",
"gewohnheiten",
"beziehungen",
"arbeit",
];
pub struct FactStore<'a> {
pool: &'a SqlitePool,
agent_id: &'a str,
}
impl<'a> FactStore<'a> {
pub fn new(pool: &'a SqlitePool, agent_id: &'a str) -> Self {
Self { pool, agent_id }
}
pub async fn upsert(&self, category: &str, key: &str, value: &str) -> Result<()> {
if !DEFAULT_CATEGORIES.contains(&category) {
warn!(
category = %category,
agent = %self.agent_id,
"Neue Fakten-Kategorie angelegt"
);
}
let now = Utc::now().timestamp();
sqlx::query(
"INSERT INTO facts (agent_id, category, key, value, updated_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(agent_id, category, key)
DO UPDATE SET value = excluded.value, updated_at = excluded.updated_at"
)
.bind(self.agent_id)
.bind(category)
.bind(key)
.bind(value)
.bind(now)
.execute(self.pool)
.await?;
Ok(())
}
pub async fn delete(&self, category: &str, key: &str) -> Result<()> {
sqlx::query(
"DELETE FROM facts WHERE agent_id = ? AND category = ? AND key = ?"
)
.bind(self.agent_id)
.bind(category)
.bind(key)
.execute(self.pool)
.await?;
Ok(())
}
pub async fn get_category(&self, category: &str) -> Result<Vec<Fact>> {
let rows = sqlx::query(
"SELECT id, agent_id, category, key, value, updated_at
FROM facts
WHERE agent_id = ? AND category = ?
ORDER BY key"
)
.bind(self.agent_id)
.bind(category)
.fetch_all(self.pool)
.await?;
Ok(rows.into_iter().map(|r| Fact {
id: r.get("id"),
agent_id: r.get("agent_id"),
category: r.get("category"),
key: r.get("key"),
value: r.get("value"),
updated_at: chrono::DateTime::from_timestamp(r.get("updated_at"), 0)
.unwrap_or_default(),
}).collect())
}
pub async fn category_summaries(&self) -> Result<Vec<CategorySummary>> {
let rows = sqlx::query(
"SELECT category, COUNT(*) as count
FROM facts
WHERE agent_id = ?
GROUP BY category
ORDER BY category"
)
.bind(self.agent_id)
.fetch_all(self.pool)
.await?;
Ok(rows.into_iter().map(|r| CategorySummary {
category: r.get("category"),
count: r.get("count"),
}).collect())
}
}
+15 -1
View File
@@ -1 +1,15 @@
// Nazarick - 3-layer memory system for context management
// memory/src/lib.rs
pub mod models;
pub mod store;
pub mod conversation;
pub mod facts;
pub mod memory_impl;
pub mod summarizer;
pub mod usage;
pub use store::MemoryStore;
pub use conversation::ConversationStore;
pub use facts::{FactStore, DEFAULT_CATEGORIES};
pub use models::{Conversation, ConversationMessage, Fact, CategorySummary};
pub use summarizer::Summarizer;
+96
View File
@@ -0,0 +1,96 @@
// memory/src/impl.rs
use async_trait::async_trait;
use nazarick_core::memory::{
Memory, MemoryMessage, MemoryFact, MemoryCategorySummary
};
use nazarick_core::error::NazarickError;
use crate::store::MemoryStore;
use crate::conversation::ConversationStore;
use crate::facts::FactStore;
use crate::usage::UsageStore;
type Result<T> = std::result::Result<T, NazarickError>;
#[async_trait]
impl Memory for MemoryStore {
async fn get_or_create_conversation(&self, timeout_mins: u64) -> Result<i64> {
let store = ConversationStore::new(&self.pool, &self.agent_id);
let conv = store.get_or_create(timeout_mins).await
.map_err(|e| NazarickError::Memory(e.to_string()))?;
Ok(conv.id)
}
async fn save_message(&self, conversation_id: i64, role: &str, content: &str) -> Result<()> {
let store = ConversationStore::new(&self.pool, &self.agent_id);
store.save_message(conversation_id, role, content).await
.map_err(|e| NazarickError::Memory(e.to_string()))
}
async fn load_window(&self, conversation_id: i64, window: usize) -> Result<Vec<MemoryMessage>> {
let store = ConversationStore::new(&self.pool, &self.agent_id);
let messages = store.load_window(conversation_id, window).await
.map_err(|e| NazarickError::Memory(e.to_string()))?;
Ok(messages.into_iter().map(|m| MemoryMessage {
role: m.role,
content: m.content,
}).collect())
}
async fn last_summary(&self) -> Result<Option<String>> {
let store = ConversationStore::new(&self.pool, &self.agent_id);
store.last_summary().await
.map_err(|e| NazarickError::Memory(e.to_string()))
}
async fn close_conversation(&self, conversation_id: i64, summary: Option<&str>) -> Result<()> {
let store = ConversationStore::new(&self.pool, &self.agent_id);
store.close(conversation_id, summary).await
.map_err(|e| NazarickError::Memory(e.to_string()))
}
async fn upsert_fact(&self, category: &str, key: &str, value: &str) -> Result<()> {
let store = FactStore::new(&self.pool, &self.agent_id);
store.upsert(category, key, value).await
.map_err(|e| NazarickError::Memory(e.to_string()))
}
async fn delete_fact(&self, category: &str, key: &str) -> Result<()> {
let store = FactStore::new(&self.pool, &self.agent_id);
store.delete(category, key).await
.map_err(|e| NazarickError::Memory(e.to_string()))
}
async fn get_category(&self, category: &str) -> Result<Vec<MemoryFact>> {
let store = FactStore::new(&self.pool, &self.agent_id);
let facts = store.get_category(category).await
.map_err(|e| NazarickError::Memory(e.to_string()))?;
Ok(facts.into_iter().map(|f| MemoryFact {
category: f.category,
key: f.key,
value: f.value,
}).collect())
}
async fn category_summaries(&self) -> Result<Vec<MemoryCategorySummary>> {
let store = FactStore::new(&self.pool, &self.agent_id);
let summaries = store.category_summaries().await
.map_err(|e| NazarickError::Memory(e.to_string()))?;
Ok(summaries.into_iter().map(|s| MemoryCategorySummary {
category: s.category,
count: s.count,
}).collect())
}
async fn log_usage(
&self,
tokens_input: u64,
tokens_output: u64,
cost: Option<f64>,
finish_reason: Option<&str>,
) -> Result<()> {
let store = UsageStore { pool: &self.pool, agent_id: &self.agent_id };
store.log(tokens_input, tokens_output, cost, finish_reason).await
.map_err(|e| NazarickError::Memory(e.to_string()))
}
}
+50
View File
@@ -0,0 +1,50 @@
// memory/src/models.rs
//
// Shared Structs — werden von conversation.rs und facts.rs genutzt.
use chrono::{DateTime, Utc};
// ─── Konversation ─────────────────────────────────────────────────────────────
/// Ein Gespräch — Container für eine zusammenhängende Nachrichtenfolge.
/// Wird geschlossen wenn Timeout oder Tageswechsel eintritt.
#[derive(Debug, Clone)]
pub struct Conversation {
pub id: i64,
pub agent_id: String,
pub date: String, // "2026-03-18"
pub summary: Option<String>,
pub closed: bool,
pub created_at: DateTime<Utc>,
}
/// Eine einzelne Nachricht in einem Gespräch.
#[derive(Debug, Clone)]
pub struct ConversationMessage {
pub id: i64,
pub conversation_id: i64,
pub role: String,
pub content: String,
pub timestamp: DateTime<Utc>,
}
// ─── Facts ────────────────────────────────────────────────────────────────────
/// Ein gespeicherter Fakt über den User.
#[derive(Debug, Clone)]
pub struct Fact {
pub id: i64,
pub agent_id: String,
pub category: String, // "persönlich" | "präferenzen" | ...
pub key: String, // "name" | "kaffee" | ...
pub value: String,
pub updated_at: DateTime<Utc>,
}
/// Übersicht einer Kategorie — nur für den Prompt-Block.
/// Kein Inhalt, nur Name + Anzahl Einträge.
#[derive(Debug, Clone)]
pub struct CategorySummary {
pub category: String,
pub count: i64,
}
+93
View File
@@ -0,0 +1,93 @@
// memory/src/store.rs
//
// SQLite Verbindung + Schema-Setup.
// Eine DB-Datei pro Agent — saubere Trennung.
use sqlx::SqlitePool;
use anyhow::Result;
pub struct MemoryStore {
pub pool: SqlitePool,
pub agent_id: String,
}
impl MemoryStore {
/// Öffnet oder erstellt die SQLite DB für einen Agenten.
/// `agent_id` → "sebas_tian" → "data/sebas_tian.db"
pub async fn open(agent_id: &str) -> Result<Self> {
// data/ Ordner anlegen falls nicht vorhanden
tokio::fs::create_dir_all("data").await?;
let path = format!("data/{}.db", agent_id);
// SQLite URL — create_if_missing erstellt die Datei automatisch
let url = format!("sqlite://{}?mode=rwc", path);
let pool = SqlitePool::connect(&url).await?;
let store = Self { pool, agent_id: agent_id.to_string() };
store.migrate().await?;
Ok(store)
}
/// Erstellt alle Tabellen falls sie noch nicht existieren.
/// Idempotent — kann mehrfach aufgerufen werden.
async fn migrate(&self) -> Result<()> {
sqlx::query(
"CREATE TABLE IF NOT EXISTS conversations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_id TEXT NOT NULL,
date TEXT NOT NULL,
summary TEXT,
closed INTEGER NOT NULL DEFAULT 0,
created_at INTEGER NOT NULL
)"
)
.execute(&self.pool)
.await?;
sqlx::query(
"CREATE TABLE IF NOT EXISTS messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
conversation_id INTEGER NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
timestamp INTEGER NOT NULL,
FOREIGN KEY (conversation_id) REFERENCES conversations(id)
)"
)
.execute(&self.pool)
.await?;
sqlx::query(
"CREATE TABLE IF NOT EXISTS facts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_id TEXT NOT NULL,
category TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
updated_at INTEGER NOT NULL,
UNIQUE(agent_id, category, key)
)"
)
.execute(&self.pool)
.await?;
sqlx::query(
"CREATE TABLE IF NOT EXISTS usage_log (
id INTEGER PRIMARY KEY AUTOINCREMENT,
agent_id TEXT NOT NULL,
timestamp INTEGER NOT NULL,
tokens_input INTEGER NOT NULL,
tokens_output INTEGER NOT NULL,
cost REAL,
finish_reason TEXT
)"
)
.execute(&self.pool)
.await?;
Ok(())
}
}
+117
View File
@@ -0,0 +1,117 @@
// memory/src/summarizer.rs
use anyhow::Result as AnyhowResult;
use async_trait::async_trait;
use reqwest::Client;
use serde::{Deserialize, Serialize};
use nazarick_core::error::NazarickError;
pub struct Summarizer {
client: Client,
url: String,
model: String,
max_summary_tokens: usize,
}
impl Summarizer {
pub fn new(
url: impl Into<String>,
model: impl Into<String>,
max_summary_tokens: usize,
) -> Self {
Self {
client: Client::new(),
url: url.into(),
model: model.into(),
max_summary_tokens,
}
}
async fn do_summarize(&self, messages: &[(String, String)]) -> AnyhowResult<String> {
let conversation = messages.iter()
.map(|(role, content)| format!("{}: {}", role, content))
.collect::<Vec<_>>()
.join("\n");
// Input begrenzen — von hinten kürzen damit neueste Nachrichten erhalten bleiben
let max_chars = self.max_summary_tokens * 4;
let conversation = if conversation.len() > max_chars {
let start = conversation.len() - max_chars;
let mut idx = start;
while !conversation.is_char_boundary(idx) {
idx += 1;
}
conversation[idx..].to_string()
} else {
conversation
};
let prompt = format!(
"Fasse das folgende Gespräch in 3-5 Sätzen zusammen. \
Fokus auf wichtige Fakten, Entscheidungen und Kontext. \
Keine Begrüßungen oder Smalltalk. Nur das Wesentliche.\n\n{}",
conversation
);
let request = SummaryRequest {
model: self.model.clone(),
messages: vec![
SummaryMessage { role: "user".to_string(), content: prompt }
],
max_tokens: 256,
temperature: 0.3,
};
let response = self.client
.post(format!("{}/v1/chat/completions", self.url))
.json(&request)
.send()
.await?
.error_for_status()?
.json::<SummaryResponse>()
.await?;
let summary = response.choices
.into_iter()
.next()
.map(|c| c.message.content)
.unwrap_or_default();
Ok(summary)
}
}
#[async_trait]
impl nazarick_core::summarizer::Summarizer for Summarizer {
async fn summarize(
&self,
messages: &[(String, String)],
) -> std::result::Result<String, NazarickError> {
self.do_summarize(messages).await
.map_err(|e| NazarickError::Memory(e.to_string()))
}
}
#[derive(Serialize)]
struct SummaryRequest {
model: String,
messages: Vec<SummaryMessage>,
max_tokens: u32,
temperature: f32,
}
#[derive(Serialize, Deserialize)]
struct SummaryMessage {
role: String,
content: String,
}
#[derive(Deserialize)]
struct SummaryResponse {
choices: Vec<SummaryChoice>,
}
#[derive(Deserialize)]
struct SummaryChoice {
message: SummaryMessage,
}
+59
View File
@@ -0,0 +1,59 @@
// memory/src/usage.rs
//
// Logging von Token-Verbrauch und Kosten pro LLM-Call.
use anyhow::Result;
use sqlx::SqlitePool;
pub struct UsageStore<'a> {
pub pool: &'a SqlitePool,
pub agent_id: &'a str,
}
impl<'a> UsageStore<'a> {
/// Speichert einen LLM-Call in usage_log.
pub async fn log(
&self,
tokens_input: u64,
tokens_output: u64,
cost: Option<f64>,
finish_reason: Option<&str>,
) -> Result<()> {
let now = chrono::Utc::now().timestamp();
sqlx::query(
"INSERT INTO usage_log (agent_id, timestamp, tokens_input, tokens_output, cost, finish_reason)
VALUES (?, ?, ?, ?, ?, ?)"
)
.bind(self.agent_id)
.bind(now)
.bind(tokens_input as i64)
.bind(tokens_output as i64)
.bind(cost)
.bind(finish_reason)
.execute(self.pool)
.await?;
Ok(())
}
/// Gibt Gesamtkosten und Token-Summen zurück.
pub async fn totals(&self) -> Result<UsageTotals> {
let row = sqlx::query_as::<_, UsageTotals>(
"SELECT
COALESCE(SUM(tokens_input), 0) as total_input,
COALESCE(SUM(tokens_output), 0) as total_output,
COALESCE(SUM(cost), 0.0) as total_cost
FROM usage_log WHERE agent_id = ?"
)
.bind(self.agent_id)
.fetch_one(self.pool)
.await?;
Ok(row)
}
}
#[derive(Debug, sqlx::FromRow)]
pub struct UsageTotals {
pub total_input: i64,
pub total_output: i64,
pub total_cost: f64,
}