Streaming & SSE
Wort-für-Wort-Antworten, Heartbeats, Cost-Cap, Resume — die Architektur hinter dem Live-Chat.
TaskMonkey streamt Antworten live über Server-Sent Events (SSE). Seit dem Streaming-Overhaul am 2026-05-17 ist die Architektur:
- Echtes Token-Streaming vom LLM (Anthropic) durch unsere SSE-Pipe
- Heartbeats vor jedem LLM-Call und während Tool-Execution
- Strukturierte Error-Codes für gezieltes Client-Verhalten
- Cost-Cap pro Turn mit Pause-Schwelle
- SSE-Resume mit
Last-Event-IDfür Reconnects
Endpunkte
GET /chat/stream?chat_id=<id>&tenant=<code>&message=<urlencoded text>
GET /chat/resume?chat_id=<id>&tenant=<code>&since=<ISO 8601>
/chat/stream startet einen neuen Turn (User-Message → Agent-Loop).
/chat/resume replayed nur persistierte Messages seit since, ohne neuen Turn — für App-Resume nach Hintergrund.
Antwort-Header (beide):
Content-Type: text/event-stream
Cache-Control: no-cache
X-Accel-Buffering: no
Beide Endpoints senden direkt nach den Headern ein 2 KB Padding-Comment, damit Mittwald/Nginx-Proxies ihre Output-Buffer leeren und der iOS-Client das erste TCP-Paket sieht.
Event-Typen
| Event | Payload | Bedeutung |
|---|---|---|
status |
{"message":"..."} |
Server-Status, z. B. „Verarbeite Anfrage..." |
tool_start |
{"tool":"X","label":"...","args":{...}} |
Tool-Call startet. args nur für runPython. |
tool_progress |
{"tool":"X","elapsed":3.0} |
Tool läuft noch (Watchdog-Tick alle 3s). |
tool_complete |
{"tool":"X","success":true,"duration_ms":1234} |
Tool fertig. |
plan_update |
{"plan":[...],"progress":{"total":4,"done":1,...}} |
createPlan/updatePlan/getPlan ausgeführt — Snapshot für UI. |
message_delta |
{"delta":"...","final":false} |
Token-Delta vom Anthropic-Streaming. Wort-für-Wort UX. |
message |
{"content":"..."} |
Finaler Text-Block (formatReply-HTML). |
suggestions |
{"suggestions":[...],"toggles":[...],"inputs":[...]} |
Klickbare Folgevorschläge / Toggles / Input-Felder. |
task_changed |
{"task":"Bestellungen"} |
Unified-Mode: Bereich wurde gewechselt. |
error |
{"message":"...","code":"timeout"|"rate_limit"|"auth_provider"|"empty_reply"|"provider"|"internal","retry_after":10} |
Strukturierter Fehler. |
done |
{} |
Ende der Antwort. Letzter Event. |
Zusätzlich: SSE-Kommentar-Lines : <label> <timestamp>\n\n als Heartbeats. Vom Client ignoriert, halten aber die TCP-Connection lebendig — verhindert dass Mittwald oder iOS-NSURLSession die Verbindung idle-killt.
Jeder strukturierte Event hat einen id: N\n-Header (monoton). Bei einem Reconnect schickt der Client Last-Event-ID: N im Header oder ?last_event_id=N als Query, der Server ersetzt damit den Start-Counter.
Beispiel-Stream (Anthropic-Streaming, agentMode)
: padding-2kb
: llm-start 1768672921
id: 1
event: status
data: {"message":"Verarbeite Anfrage..."}
id: 2
event: message_delta
data: {"delta":"Ich lade","final":false}
id: 3
event: message_delta
data: {"delta":" zuerst die","final":false}
id: 4
event: tool_start
data: {"tool":"getSalesOrders","label":"Lade Bestellungen..."}
: tool-getSalesOrders 1768672924
id: 5
event: tool_progress
data: {"tool":"getSalesOrders","elapsed":3.0}
id: 6
event: tool_complete
data: {"tool":"getSalesOrders","success":true,"duration_ms":4521}
id: 7
event: plan_update
data: {"plan":[{"index":0,"title":"Daten laden","status":"done"},...],"progress":{"total":3,"done":1,"failed":0,"remaining":2}}
id: 8
event: message
data: {"content":"<b>3 Bestellungen</b> heute geladen."}
id: 9
event: suggestions
data: {"suggestions":["Details zu #1001","Alle Bestellungen heute"]}
id: 10
event: done
data: {}
Cost-Cap & Pause-Schwelle
Im agentMode mit gesetztem cost_cap_eur (z. B. 2.00 für agent_proto) akkumuliert ChatService Token-Verbrauch pro Turn und pausiert wenn die geschätzten EUR-Kosten den Cap überschreiten. Das wirkt statt des max_iterations-Caps — der hat eigenen Hinweis.
Pause-Schwelle-Verhalten:
- Letzte assistant-Antwort wird mit Hinweis-Text gepaart
- Suggestions
["Weitermachen", "Stop, das reicht"] - User-Klick „Weitermachen" → neuer Turn mit frischem Budget, History bleibt komplett
SSE-Resume Pattern
iOS-App nutzt App-Lifecycle:
@override
void didChangeAppLifecycleState(AppLifecycleState state) {
if (state == AppLifecycleState.resumed && _chatId != null) {
_maybeFetchCatchupViaResume();
}
}
_maybeFetchCatchupViaResume ruft GET /chat/resume?since=<letztes gesehenes assistant.created> und rendert die neuen Messages als normale Bubbles. Kein neuer Turn, kein LLM-Call.
Heartbeats (Comment-Lines)
PHP ist während eines synchronen Anthropic-cURL-Calls blockiert. Damit die Connection trotzdem lebt:
- Vor jedem LLM-Call:
: llm-start <ts>\n\nbzw: iter-N <ts>\n\n - Beim Anthropic-Streaming: jeder
text_deltatriggert sofort einenmessage_delta-Event durch unsere Pipe → konstanter Byte-Strom - Während Tool-Execution: pcntl-alarm-Watchdog tickt alle 3s mit
: tool-<name> <ts>\n\n+tool_progress-Event
Damit ist die längste Idle-Phase im Stream typischerweise < 5s.
Client-Implementation
JavaScript (Browser-Widget):
const source = new EventSource(
`/chat/stream?chat_id=unified_42&tenant=mecomedia&message=${encodeURIComponent(text)}`
);
source.addEventListener('status', e => showSpinner(JSON.parse(e.data).message));
source.addEventListener('tool_progress', e => {
const { tool, elapsed } = JSON.parse(e.data);
setStatusBubble(`${tool} läuft (${Math.round(elapsed)}s)`);
});
source.addEventListener('plan_update', e => {
const { plan, progress } = JSON.parse(e.data);
renderPlanCard(plan, progress);
});
source.addEventListener('message_delta', e => {
const { delta } = JSON.parse(e.data);
appendDeltaToCurrentBubble(delta);
});
source.addEventListener('message', e => appendToChat(JSON.parse(e.data).content));
source.addEventListener('suggestions', e => renderSuggestions(JSON.parse(e.data).suggestions));
source.addEventListener('error', e => {
const { code, message, retry_after } = JSON.parse(e.data);
if (code === 'empty_reply') return; // ignore — erwartetes graceful end
if (code === 'rate_limit') showToast(`Rate limit, retry in ${retry_after}s`);
else showError(message);
});
source.addEventListener('done', () => source.close());
Dart (Flutter): nutzt nativen URLSessionDataDelegate auf iOS für SSE-Stabilität. Siehe Mobile App.
Nicht-Streaming Fallback
Wenn dein Client kein SSE unterstützt (z. B. eine server-zu-server-Integration), nutze:
POST /chat/index
Content-Type: application/json
{
"chat_id": "...",
"tenant": "...",
"message": "..."
}
Die Antwort kommt als ein JSON-Objekt, sobald sie komplett ist (ggf. mehrere Minuten). error_code-Feld wenn Fehler.
Proxy-Fallen
- nginx:
proxy_buffering off;auf der/chat/stream- und/chat/resume-Location - CloudFlare: funktioniert, aber „Rocket Loader" deaktivieren
- Mittwald: das 2 KB Padding nach den Headern ist Pflicht — ohne wartet der Proxy auf mehr Bytes bevor er den Stream weiterreicht
Reconnection
- Mit
Last-Event-ID-Header re-connecten — der Server setzt den Start-Counter entsprechend - Bei
error-Codetimeoutoderrate_limit: nachretry_afterSekunden retry - Bei
internaloderauth_provider: nicht automatisch retry — User informieren - Wenn die App im Hintergrund war: nicht den Stream resumen, sondern
GET /chat/resume?since=...aufrufen — schiebt die persistierten Updates rein, ohne den Turn neu zu starten
Verwandte Doks
- Mobile App — Flutter-SSE-Implementation, App-Resume
- Agent-Mode — Plan-Exec-Reflect-Loop, Reference-Pattern, Cost-Cap
- Async-Jobs — Background-Worker,
waitForAsyncJob