TaskMonkey Handbuch

Streaming & SSE

Wort-für-Wort-Antworten, Heartbeats, Cost-Cap, Resume — die Architektur hinter dem Live-Chat.

TaskMonkey streamt Antworten live über Server-Sent Events (SSE). Seit dem Streaming-Overhaul am 2026-05-17 ist die Architektur:

  • Echtes Token-Streaming vom LLM (Anthropic) durch unsere SSE-Pipe
  • Heartbeats vor jedem LLM-Call und während Tool-Execution
  • Strukturierte Error-Codes für gezieltes Client-Verhalten
  • Cost-Cap pro Turn mit Pause-Schwelle
  • SSE-Resume mit Last-Event-ID für Reconnects

Endpunkte

GET /chat/stream?chat_id=<id>&tenant=<code>&message=<urlencoded text>
GET /chat/resume?chat_id=<id>&tenant=<code>&since=<ISO 8601>

/chat/stream startet einen neuen Turn (User-Message → Agent-Loop). /chat/resume replayed nur persistierte Messages seit since, ohne neuen Turn — für App-Resume nach Hintergrund.

Antwort-Header (beide):

Content-Type: text/event-stream
Cache-Control: no-cache
X-Accel-Buffering: no

Beide Endpoints senden direkt nach den Headern ein 2 KB Padding-Comment, damit Mittwald/Nginx-Proxies ihre Output-Buffer leeren und der iOS-Client das erste TCP-Paket sieht.

Event-Typen

Event Payload Bedeutung
status {"message":"..."} Server-Status, z. B. „Verarbeite Anfrage..."
tool_start {"tool":"X","label":"...","args":{...}} Tool-Call startet. args nur für runPython.
tool_progress {"tool":"X","elapsed":3.0} Tool läuft noch (Watchdog-Tick alle 3s).
tool_complete {"tool":"X","success":true,"duration_ms":1234} Tool fertig.
plan_update {"plan":[...],"progress":{"total":4,"done":1,...}} createPlan/updatePlan/getPlan ausgeführt — Snapshot für UI.
message_delta {"delta":"...","final":false} Token-Delta vom Anthropic-Streaming. Wort-für-Wort UX.
message {"content":"..."} Finaler Text-Block (formatReply-HTML).
suggestions {"suggestions":[...],"toggles":[...],"inputs":[...]} Klickbare Folgevorschläge / Toggles / Input-Felder.
task_changed {"task":"Bestellungen"} Unified-Mode: Bereich wurde gewechselt.
error {"message":"...","code":"timeout"|"rate_limit"|"auth_provider"|"empty_reply"|"provider"|"internal","retry_after":10} Strukturierter Fehler.
done {} Ende der Antwort. Letzter Event.

Zusätzlich: SSE-Kommentar-Lines : <label> <timestamp>\n\n als Heartbeats. Vom Client ignoriert, halten aber die TCP-Connection lebendig — verhindert dass Mittwald oder iOS-NSURLSession die Verbindung idle-killt.

Jeder strukturierte Event hat einen id: N\n-Header (monoton). Bei einem Reconnect schickt der Client Last-Event-ID: N im Header oder ?last_event_id=N als Query, der Server ersetzt damit den Start-Counter.

Beispiel-Stream (Anthropic-Streaming, agentMode)

: padding-2kb

: llm-start 1768672921

id: 1
event: status
data: {"message":"Verarbeite Anfrage..."}

id: 2
event: message_delta
data: {"delta":"Ich lade","final":false}

id: 3
event: message_delta
data: {"delta":" zuerst die","final":false}

id: 4
event: tool_start
data: {"tool":"getSalesOrders","label":"Lade Bestellungen..."}

: tool-getSalesOrders 1768672924

id: 5
event: tool_progress
data: {"tool":"getSalesOrders","elapsed":3.0}

id: 6
event: tool_complete
data: {"tool":"getSalesOrders","success":true,"duration_ms":4521}

id: 7
event: plan_update
data: {"plan":[{"index":0,"title":"Daten laden","status":"done"},...],"progress":{"total":3,"done":1,"failed":0,"remaining":2}}

id: 8
event: message
data: {"content":"<b>3 Bestellungen</b> heute geladen."}

id: 9
event: suggestions
data: {"suggestions":["Details zu #1001","Alle Bestellungen heute"]}

id: 10
event: done
data: {}

Cost-Cap & Pause-Schwelle

Im agentMode mit gesetztem cost_cap_eur (z. B. 2.00 für agent_proto) akkumuliert ChatService Token-Verbrauch pro Turn und pausiert wenn die geschätzten EUR-Kosten den Cap überschreiten. Das wirkt statt des max_iterations-Caps — der hat eigenen Hinweis.

Pause-Schwelle-Verhalten:

  • Letzte assistant-Antwort wird mit Hinweis-Text gepaart
  • Suggestions ["Weitermachen", "Stop, das reicht"]
  • User-Klick „Weitermachen" → neuer Turn mit frischem Budget, History bleibt komplett

SSE-Resume Pattern

iOS-App nutzt App-Lifecycle:

@override
void didChangeAppLifecycleState(AppLifecycleState state) {
  if (state == AppLifecycleState.resumed && _chatId != null) {
    _maybeFetchCatchupViaResume();
  }
}

_maybeFetchCatchupViaResume ruft GET /chat/resume?since=<letztes gesehenes assistant.created> und rendert die neuen Messages als normale Bubbles. Kein neuer Turn, kein LLM-Call.

Heartbeats (Comment-Lines)

PHP ist während eines synchronen Anthropic-cURL-Calls blockiert. Damit die Connection trotzdem lebt:

  • Vor jedem LLM-Call: : llm-start <ts>\n\n bzw : iter-N <ts>\n\n
  • Beim Anthropic-Streaming: jeder text_delta triggert sofort einen message_delta-Event durch unsere Pipe → konstanter Byte-Strom
  • Während Tool-Execution: pcntl-alarm-Watchdog tickt alle 3s mit : tool-<name> <ts>\n\n + tool_progress-Event

Damit ist die längste Idle-Phase im Stream typischerweise < 5s.

Client-Implementation

JavaScript (Browser-Widget):

const source = new EventSource(
  `/chat/stream?chat_id=unified_42&tenant=mecomedia&message=${encodeURIComponent(text)}`
);

source.addEventListener('status', e => showSpinner(JSON.parse(e.data).message));
source.addEventListener('tool_progress', e => {
  const { tool, elapsed } = JSON.parse(e.data);
  setStatusBubble(`${tool} läuft (${Math.round(elapsed)}s)`);
});
source.addEventListener('plan_update', e => {
  const { plan, progress } = JSON.parse(e.data);
  renderPlanCard(plan, progress);
});
source.addEventListener('message_delta', e => {
  const { delta } = JSON.parse(e.data);
  appendDeltaToCurrentBubble(delta);
});
source.addEventListener('message', e => appendToChat(JSON.parse(e.data).content));
source.addEventListener('suggestions', e => renderSuggestions(JSON.parse(e.data).suggestions));
source.addEventListener('error', e => {
  const { code, message, retry_after } = JSON.parse(e.data);
  if (code === 'empty_reply') return; // ignore — erwartetes graceful end
  if (code === 'rate_limit') showToast(`Rate limit, retry in ${retry_after}s`);
  else showError(message);
});
source.addEventListener('done', () => source.close());

Dart (Flutter): nutzt nativen URLSessionDataDelegate auf iOS für SSE-Stabilität. Siehe Mobile App.

Nicht-Streaming Fallback

Wenn dein Client kein SSE unterstützt (z. B. eine server-zu-server-Integration), nutze:

POST /chat/index
Content-Type: application/json

{
  "chat_id": "...",
  "tenant": "...",
  "message": "..."
}

Die Antwort kommt als ein JSON-Objekt, sobald sie komplett ist (ggf. mehrere Minuten). error_code-Feld wenn Fehler.

Proxy-Fallen

  • nginx: proxy_buffering off; auf der /chat/stream- und /chat/resume-Location
  • CloudFlare: funktioniert, aber „Rocket Loader" deaktivieren
  • Mittwald: das 2 KB Padding nach den Headern ist Pflicht — ohne wartet der Proxy auf mehr Bytes bevor er den Stream weiterreicht

Reconnection

  • Mit Last-Event-ID-Header re-connecten — der Server setzt den Start-Counter entsprechend
  • Bei error-Code timeout oder rate_limit: nach retry_after Sekunden retry
  • Bei internal oder auth_provider: nicht automatisch retry — User informieren
  • Wenn die App im Hintergrund war: nicht den Stream resumen, sondern GET /chat/resume?since=... aufrufen — schiebt die persistierten Updates rein, ohne den Turn neu zu starten

Verwandte Doks

  • Mobile App — Flutter-SSE-Implementation, App-Resume
  • Agent-Mode — Plan-Exec-Reflect-Loop, Reference-Pattern, Cost-Cap
  • Async-Jobs — Background-Worker, waitForAsyncJob
Zuletzt aktualisiert: 2026-05-17