AI Aggregator Starter Model – Boilerplate

Below is a minimal, production‑minded starter you can run locally to get a side‑by‑side multi‑model AI aggregator. It includes:

  • Backend (TypeScript + Express): provider adapters (OpenAI, Anthropic, Google), a single /api/aggregate route to fan‑out requests concurrently, token usage plumbing, and optional image generation.
  • Frontend (Next.js + React + Tailwind): model picker, side‑by‑side answers, a simple Prompt Boost pre‑processor, chat history (in‑memory), and light/dark UI.
  • Env & Security: API key loading, simple rate limit, provider timeouts, CORS, and request schema validation.

Copy the file contents into a folder structure exactly as shown. Replace placeholder API keys in .env.local.


1) Project Structure

ai-aggregator/
├─ apps/
│  ├─ web/               # Next.js (frontend)
│  └─ api/               # Express (backend)
├─ package.json          # Root scripts
├─ pnpm-workspace.yaml   # or yarn/npm workspaces
└─ README.md

2) Root workspace files

package.json

{
  "name": "ai-aggregator",
  "private": true,
  "workspaces": ["apps/web", "apps/api"],
  "scripts": {
    "dev": "concurrently -n API,WEB -c blue,green \"pnpm --filter @app/api dev\" \"pnpm --filter @app/web dev\"",
    "build": "pnpm -r build",
    "start": "concurrently -n API,WEB -c blue,green \"pnpm --filter @app/api start\" \"pnpm --filter @app/web start\""
  },
  "devDependencies": {
    "concurrently": "^9.0.0",
    "pnpm": "^9.0.0"
  }
}

pnpm-workspace.yaml

packages:
  - "apps/*"

README.md

AI Aggregator (AI Fiesta‑like) – Starter

Quickstart
1. `pnpm install` (or npm/yarn)
2. Create `apps/api/.env` and `apps/web/.env.local` (see examples below)
3. `pnpm dev`
4. Open http://localhost:3000

Notes
- This is a minimal starter. Harden auth, add DB, and refine UX before production.
- Bring your own API keys for providers you enable.

3) Backend: apps/api

apps/api/package.json

{
  "name": "@app/api",
  "type": "module",
  "main": "dist/index.js",
  "scripts": {
    "dev": "tsx src/index.ts",
    "build": "tsc -p tsconfig.json",
    "start": "node dist/index.js"
  },
  "dependencies": {
    "anthropic": "^0.32.0",
    "cors": "^2.8.5",
    "dotenv": "^16.4.5",
    "express": "^4.19.2",
    "google-generative-ai": "^0.16.0",
    "openai": "^4.55.0",
    "zod": "^3.23.8"
  },
  "devDependencies": {
    "tsx": "^4.16.0",
    "typescript": "^5.5.4"
  }
}

apps/api/tsconfig.json

{
  "compilerOptions": {
    "target": "ES2022",
    "module": "ES2022",
    "moduleResolution": "bundler",
    "outDir": "dist",
    "rootDir": "src",
    "strict": true,
    "esModuleInterop": true,
    "skipLibCheck": true
  },
  "include": ["src"]
}

apps/api/.env (example)

PORT=4000
# set only those you will use
OPENAI_API_KEY=sk-...
ANTHROPIC_API_KEY=...
GEMINI_API_KEY=...
# CORS allowlist (comma separated)
CORS_ORIGINS=http://localhost:3000

apps/api/src/types.ts

export type ProviderId = "openai" | "anthropic" | "gemini";

export interface ProviderResponse {
  provider: ProviderId;
  output: string;
  usage?: { inputTokens?: number; outputTokens?: number; costUSD?: number };
  latencyMs: number;
  error?: string;
}

apps/api/src/providers/openai.ts

import OpenAI from "openai";

const client = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });

export async function openaiChat(prompt: string, system?: string) {
  const started = Date.now();
  try {
    const res = await client.chat.completions.create({
      model: "gpt-4o-mini", // pick your model
      messages: [
        ...(system ? [{ role: "system", content: system }] : []),
        { role: "user", content: prompt }
      ],
      temperature: 0.7,
    });
    const text = res.choices?.[0]?.message?.content ?? "";
    const usage = res.usage
      ? { inputTokens: res.usage.prompt_tokens, outputTokens: res.usage.completion_tokens }
      : undefined;
    return { output: text, usage, latencyMs: Date.now() - started } as const;
  } catch (err: any) {
    return { output: "", latencyMs: Date.now() - started, error: err?.message ?? String(err) } as const;
  }
}

apps/api/src/providers/anthropic.ts

import Anthropic from "anthropic";

const client = new Anthropic({ apiKey: process.env.ANTHROPIC_API_KEY });

export async function anthropicChat(prompt: string, system?: string) {
  const started = Date.now();
  try {
    const res = await client.messages.create({
      model: "claude-3-5-sonnet-20240620",
      system,
      max_tokens: 1024,
      messages: [{ role: "user", content: prompt }]
    });
    const text = res.content?.[0]?.type === "text" ? res.content[0].text : JSON.stringify(res.content);
    return { output: text, latencyMs: Date.now() - started } as const;
  } catch (err: any) {
    return { output: "", latencyMs: Date.now() - started, error: err?.message ?? String(err) } as const;
  }
}

apps/api/src/providers/gemini.ts

import { GoogleGenerativeAI } from "google-generative-ai";

const client = new GoogleGenerativeAI(process.env.GEMINI_API_KEY || "");

export async function geminiChat(prompt: string, system?: string) {
  const started = Date.now();
  try {
    const model = client.getGenerativeModel({ model: "gemini-1.5-pro" });
    const res = await model.generateContent({ contents: [{ role: "user", parts: [{ text: (system ? system + "\n" : "") + prompt }] }] });
    const text = res.response.text();
    return { output: text, latencyMs: Date.now() - started } as const;
  } catch (err: any) {
    return { output: "", latencyMs: Date.now() - started, error: err?.message ?? String(err) } as const;
  }
}

apps/api/src/providers/index.ts

import { openaiChat } from "./openai";
import { anthropicChat } from "./anthropic";
import { geminiChat } from "./gemini";
import type { ProviderId } from "../types";

export const adapters: Record<ProviderId, (prompt: string, system?: string) => Promise<{ output: string; latencyMs: number; error?: string }>> = {
  openai: openaiChat,
  anthropic: anthropicChat,
  gemini: geminiChat,
};

apps/api/src/index.ts

import "dotenv/config";
import express from "express";
import cors from "cors";
import { z } from "zod";
import { adapters } from "./providers";
import type { ProviderId, ProviderResponse } from "./types";

const app = express();
const PORT = Number(process.env.PORT || 4000);

const allow = (process.env.CORS_ORIGINS || "").split(",").filter(Boolean);
app.use(cors({ origin: (origin, cb) => cb(null, !origin || allow.includes(origin)), credentials: true }));
app.use(express.json({ limit: "1mb" }));

// Basic rate-limit (per-IP, memory only; swap with Redis for prod)
const bucket = new Map<string, { tokens: number; reset: number }>();
function rateLimit(ip: string, limit = 30, windowMs = 60_000) {
  const now = Date.now();
  const b = bucket.get(ip) || { tokens: limit, reset: now + windowMs };
  if (now > b.reset) { b.tokens = limit; b.reset = now + windowMs; }
  if (b.tokens <= 0) return false;
  b.tokens -= 1; bucket.set(ip, b); return true;
}

const ReqSchema = z.object({
  prompt: z.string().min(1),
  system: z.string().optional(),
  providers: z.array(z.enum(["openai", "anthropic", "gemini"]))
    .default(["openai", "anthropic", "gemini"]).min(1),
});

app.post("/api/aggregate", async (req, res) => {
  if (!rateLimit(req.ip)) return res.status(429).json({ error: "Too many requests" });
  const parsed = ReqSchema.safeParse(req.body);
  if (!parsed.success) return res.status(400).json({ error: parsed.error.flatten() });

  const { prompt, system, providers } = parsed.data;

  const tasks = providers.map(async (p) => {
    const fn = adapters[p as ProviderId];
    const r = await fn(prompt, system);
    const resp: ProviderResponse = { provider: p as ProviderId, ...r };
    return resp;
  });

  // Run all in parallel with a hard timeout per provider
  const withTimeout = (p: Promise<ProviderResponse>, ms = 25_000): Promise<ProviderResponse> =>
    new Promise((resolve) => {
      const t = setTimeout(() => resolve({ provider: "openai", output: "", latencyMs: ms, error: "timeout" } as any), ms);
      p.then((r) => { clearTimeout(t); resolve(r); }).catch((e) => resolve({ ...(e || {}), error: String(e) }));
    });

  const results = await Promise.all(tasks.map((t) => withTimeout(t)));
  res.json({ results, ts: Date.now() });
});

app.get("/health", (_, res) => res.json({ ok: true }));

app.listen(PORT, () => console.log(`API listening on http://localhost:${PORT}`));

4) Frontend: apps/web

apps/web/package.json

{
  "name": "@app/web",
  "private": true,
  "scripts": {
    "dev": "next dev -p 3000",
    "build": "next build",
    "start": "next start -p 3000"
  },
  "dependencies": {
    "class-variance-authority": "^0.7.0",
    "clsx": "^2.1.1",
    "next": "^14.2.5",
    "react": "^18.3.1",
    "react-dom": "^18.3.1",
    "tailwindcss": "^3.4.7",
    "zod": "^3.23.8"
  },
  "devDependencies": {
    "autoprefixer": "^10.4.19",
    "postcss": "^8.4.39",
    "typescript": "^5.5.4"
  }
}

apps/web/next.config.js

/** @type {import('next').NextConfig} */
const nextConfig = {
  reactStrictMode: true,
  experimental: {
    serverActions: {
      bodySizeLimit: '1mb'
    }
  }
};
module.exports = nextConfig;

apps/web/tailwind.config.js

/** @type {import('tailwindcss').Config} */
module.exports = {
  content: ["./app/**/*.{ts,tsx}", "./components/**/*.{ts,tsx}"],
  theme: { extend: {} },
  darkMode: 'class',
  plugins: []
};

apps/web/postcss.config.js

module.exports = { plugins: { tailwindcss: {}, autoprefixer: {} } };

apps/web/.env.local

# Where your API is running
NEXT_PUBLIC_API_BASE=http://localhost:4000

apps/web/app/layout.tsx

import "./globals.css";
import { ReactNode } from "react";

export default function RootLayout({ children }: { children: ReactNode }) {
  return (
    <html lang="en" className="h-full">
      <body className="min-h-screen bg-neutral-50 text-neutral-900 dark:bg-neutral-950 dark:text-neutral-50">
        <div className="max-w-6xl mx-auto p-4">{children}</div>
      </body>
    </html>
  );
}

apps/web/app/globals.css

@tailwind base;
@tailwind components;
@tailwind utilities;

/* Simple cards */
.card { @apply rounded-2xl shadow-sm border border-neutral-200 dark:border-neutral-800 p-4 bg-white dark:bg-neutral-900; }
.btn { @apply inline-flex items-center justify-center rounded-xl px-4 py-2 border border-neutral-300 dark:border-neutral-700 hover:bg-neutral-100 dark:hover:bg-neutral-800; }
.input { @apply w-full rounded-xl border border-neutral-300 dark:border-neutral-700 bg-white dark:bg-neutral-900 px-3 py-2; }
.badge { @apply text-xs px-2 py-1 rounded-lg bg-neutral-100 dark:bg-neutral-800; }

apps/web/app/page.tsx

'use client';
import { useMemo, useRef, useState } from 'react';

const ALL_PROVIDERS = [
  { id: 'openai', label: 'OpenAI' },
  { id: 'anthropic', label: 'Anthropic' },
  { id: 'gemini', label: 'Gemini' },
] as const;

type ProviderId = typeof ALL_PROVIDERS[number]['id'];

type Result = {
  provider: ProviderId;
  output: string;
  latencyMs: number;
  error?: string;
};

function promptBoost(raw: string) {
  // very simple booster – expand and clarify the ask
  return `You are a helpful, concise expert. If there is ambiguity, ask 1 short clarifying question and then answer.\nTask: ${raw}`;
}

export default function Home() {
  const [prompt, setPrompt] = useState('Explain RAG vs fine-tuning in simple terms.');
  const [system, setSystem] = useState('');
  const [providers, setProviders] = useState<ProviderId[]>(ALL_PROVIDERS.map(p => p.id));
  const [loading, setLoading] = useState(false);
  const [results, setResults] = useState<Result[]>([]);
  const [useBoost, setUseBoost] = useState(true);
  const [history, setHistory] = useState<{ ts: number; prompt: string; results: Result[] }[]>([]);

  const apiBase = process.env.NEXT_PUBLIC_API_BASE || 'http://localhost:4000';
  const controllerRef = useRef<AbortController | null>(null);

  const canRun = useMemo(() => prompt.trim().length > 0 && providers.length > 0, [prompt, providers]);

  async function run() {
    if (!canRun) return;
    setLoading(true);
    setResults([]);
    const boosted = useBoost ? promptBoost(prompt) : prompt;
    controllerRef.current?.abort();
    const ctrl = new AbortController();
    controllerRef.current = ctrl;

    try {
      const res = await fetch(`${apiBase}/api/aggregate`, {
        method: 'POST',
        headers: { 'Content-Type': 'application/json' },
        body: JSON.stringify({ prompt: boosted, system, providers }),
        signal: ctrl.signal,
      });
      const data = await res.json();
      setResults(data.results || []);
      setHistory(h => [{ ts: Date.now(), prompt, results: data.results || [] }, ...h].slice(0, 25));
    } catch (e) {
      console.error(e);
    } finally {
      setLoading(false);
    }
  }

  return (
    <div className="space-y-6">
      <header className="flex items-center justify-between">
        <h1 className="text-2xl font-semibold">AI Aggregator Starter</h1>
        <button className="btn" onClick={() => document.documentElement.classList.toggle('dark')}>Toggle Theme</button>
      </header>

      <div className="grid md:grid-cols-[2fr,1fr] gap-4">
        <div className="card space-y-3">
          <label className="text-sm font-medium">Your prompt</label>
          <textarea className="input h-32" value={prompt} onChange={e => setPrompt(e.target.value)} placeholder="Ask anything..." />

          <label className="text-sm font-medium">System instructions (optional)</label>
          <input className="input" value={system} onChange={e => setSystem(e.target.value)} placeholder="e.g., You are a senior TypeScript engineer." />

          <div className="flex items-center gap-4">
            <label className="inline-flex items-center gap-2">
              <input type="checkbox" checked={useBoost} onChange={e => setUseBoost(e.target.checked)} />
              <span className="text-sm">Prompt Boost</span>
            </label>
            <div className="text-xs opacity-70">(adds lightweight structure to your prompt)</div>
          </div>

          <div className="flex items-center gap-2 flex-wrap">
            {ALL_PROVIDERS.map(p => (
              <label key={p.id} className="btn gap-2 cursor-pointer select-none">
                <input
                  type="checkbox"
                  checked={providers.includes(p.id)}
                  onChange={(e) => setProviders(prev => e.target.checked ? [...prev, p.id] : prev.filter(x => x !== p.id))}
                />
                {p.label}
              </label>
            ))}
          </div>

          <div className="flex items-center gap-2">
            <button className="btn" disabled={!canRun || loading} onClick={run}>{loading ? 'Running…' : 'Run across models'}</button>
            {loading && (
              <button className="btn" onClick={() => controllerRef.current?.abort()}>Cancel</button>
            )}
          </div>
        </div>

        <aside className="card space-y-3">
          <div className="text-sm font-medium">Recent</div>
          <div className="space-y-2 max-h-80 overflow-auto">
            {history.length === 0 && <div className="text-sm opacity-60">No history yet.</div>}
            {history.map((h, i) => (
              <button key={i} className="w-full text-left hover:underline" onClick={() => { setPrompt(h.prompt); setResults(h.results); }}>
                <div className="text-sm line-clamp-1">{h.prompt}</div>
                <div className="text-xs opacity-60">{new Date(h.ts).toLocaleString()}</div>
              </button>
            ))}
          </div>
        </aside>
      </div>

      <section className="grid md:grid-cols-3 gap-4">
        {results.map(r => (
          <div key={r.provider} className="card space-y-2">
            <div className="flex items-center justify-between">
              <div className="font-medium">{r.provider.toUpperCase()}</div>
              <span className="badge">{r.latencyMs} ms</span>
            </div>
            {r.error ? (
              <div className="text-sm text-red-500">Error: {r.error}</div>
            ) : (
              <pre className="text-sm whitespace-pre-wrap">{r.output}</pre>
            )}
          </div>
        ))}
      </section>

      <footer className="opacity-70 text-xs">Tip: Add more providers, streaming, auth, and usage metering for production.</footer>
    </div>
  );
}

5) Optional: NGINX reverse proxy (dev convenience)

server {
  listen 80;
  server_name localhost;

  location /api/ {
    proxy_pass http://localhost:4000/;
  }
  location / {
    proxy_pass http://localhost:3000/;
  }
}

6) Other details you need not mandatory but important to know

  • Auth & Billing: NextAuth.js + Stripe or Razorpay (UPI) for India‑first plans.
  • DB: Postgres + Prisma for users, projects, histories, token usage.
  • Streaming: Server‑Sent Events per provider for token‑by‑token updates.
  • Image & Audio: Add image gen endpoints (e.g., OpenAI Images, Stability) and STT (e.g., Whisper) with upload limits.
  • Projects/Modes: Persist prompt presets (Marketing, Coding, Research) and team workspaces.
  • Token Accounting: Store per‑user budgets; compute cost via model pricing tables and enforce caps.
  • Observability: pino logs, OpenTelemetry, and request IDs.
  • Hardening: input size caps, abuse detection, provider failover.

This starter is intentionally compact but gives you a working base: a UI to select models, a fan‑out API that calls multiple LLMs in parallel, and a side‑by‑side comparison view.

Here is the full code, above code are required to fetch different AI tools API call now see running code to make it working well

Backend (FastAPI)

from fastapi import FastAPI, WebSocket, Depends
from fastapi.middleware.cors import CORSMiddleware
from sqlalchemy import create_engine, Column, Integer, String, Text, ForeignKey, DateTime
from sqlalchemy.orm import sessionmaker, declarative_base, relationship
from datetime import datetime
import openai
import os

app = FastAPI()

# CORS setup
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

# Database setup
DATABASE_URL = os.getenv("DATABASE_URL", "postgresql://user:password@localhost:5432/aifiesta")
engine = create_engine(DATABASE_URL)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

class ChatSession(Base):
    __tablename__ = "chat_sessions"
    id = Column(Integer, primary_key=True, index=True)
    title = Column(String, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    messages = relationship("Message", back_populates="session")

class Message(Base):
    __tablename__ = "messages"
    id = Column(Integer, primary_key=True, index=True)
    session_id = Column(Integer, ForeignKey("chat_sessions.id"))
    role = Column(String, nullable=False)
    content = Column(Text, nullable=False)
    created_at = Column(DateTime, default=datetime.utcnow)
    session = relationship("ChatSession", back_populates="messages")

Base.metadata.create_all(bind=engine)

# Dependency
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

@app.post("/start-session")
def start_session(title: str, db=Depends(get_db)):
    session = ChatSession(title=title)
    db.add(session)
    db.commit()
    db.refresh(session)
    return {"session_id": session.id, "title": session.title}

@app.get("/history/{session_id}")
def get_history(session_id: int, db=Depends(get_db)):
    messages = db.query(Message).filter(Message.session_id == session_id).order_by(Message.created_at).all()
    return [{"role": m.role, "content": m.content, "created_at": m.created_at} for m in messages]

@app.websocket("/ws/chat/{session_id}")
async def chat_ws(websocket: WebSocket, session_id: int, db=Depends(get_db)):
    await websocket.accept()
    while True:
        data = await websocket.receive_text()
        # Save user message
        msg = Message(session_id=session_id, role="user", content=data)
        db.add(msg)
        db.commit()

        # Stream response from OpenAI
        response_text = ""
        async for chunk in await openai.ChatCompletion.acreate(
            model="gpt-4o-mini",
            messages=[{"role": "user", "content": data}],
            stream=True,
        ):
            if "choices" in chunk:
                delta = chunk["choices"][0]["delta"].get("content", "")
                response_text += delta
                await websocket.send_text(delta)

        # Save assistant response
        msg = Message(session_id=session_id, role="assistant", content=response_text)
        db.add(msg)
        db.commit()

Frontend (Next.js + React with WebSocket streaming)

'use client'
import { useState, useEffect } from 'react'

export default function Chat({ sessionId }: { sessionId: number }) {
  const [socket, setSocket] = useState<WebSocket | null>(null)
  const [messages, setMessages] = useState<{role:string, content:string}[]>([])
  const [input, setInput] = useState("")

  useEffect(() => {
    const ws = new WebSocket(`ws://localhost:8000/ws/chat/${sessionId}`)
    ws.onmessage = (event) => {
      setMessages((prev) => {
        const last = prev[prev.length-1]
        if (last?.role === 'assistant') {
          last.content += event.data
          return [...prev.slice(0, -1), last]
        }
        return [...prev, { role: 'assistant', content: event.data }]
      })
    }
    setSocket(ws)
    return () => ws.close()
  }, [sessionId])

  const sendMessage = () => {
    if (socket && input.trim()) {
      socket.send(input)
      setMessages((prev) => [...prev, { role: 'user', content: input }])
      setInput("")
    }
  }

  return (
    <div className="p-4 max-w-2xl mx-auto">
      <div className="border p-2 h-96 overflow-y-scroll rounded">
        {messages.map((m, i) => (
          <div key={i} className={m.role === 'user' ? 'text-right' : 'text-left'}>
            <span className={m.role === 'user' ? 'bg-blue-200 px-2 py-1 rounded' : 'bg-gray-200 px-2 py-1 rounded'}>
              {m.content}
            </span>
          </div>
        ))}
      </div>
      <div className="flex mt-2">
        <input
          className="border flex-1 rounded-l px-2"
          value={input}
          onChange={(e) => setInput(e.target.value)}
          onKeyDown={(e) => e.key === 'Enter' && sendMessage()}
        />
        <button className="bg-blue-500 text-white px-4 rounded-r" onClick={sendMessage}>Send</button>
      </div>
    </div>
  )
}

✅ This update adds streaming AI responses via WebSockets and stores all chat history in Postgres.

Would you like me to also add a chat session list UI (so users can resume past conversations)?


7) Streaming responses (SSE) + Postgres chat history (detailed implementation)

This section adds two major features:

  1. Streaming responses from providers to the frontend using Server‑Sent Events (SSE). The backend fans out to providers and streams partial tokens/results to the client as they arrive. The client renders streaming tokens in real time.
  2. Chat history persisted in Postgres using Prisma. Conversations, messages, and per‑message metadata (provider, latency, cost estimates) are stored and can be queried to populate recent history.

High level flow

  • Frontend sends a POST /api/stream with prompt, system, providers and a sessionId (or createSession: true).
  • Backend creates/ensures a Session row, creates a Message row for the user prompt, then starts provider adapters in parallel.
  • For each provider we open a streaming connection (provider SDK or HTTP streaming) and as tokens/chunks arrive we:
    • write SSE events to the client with { provider, chunk, done?: boolean }
    • append partial text to a MessageChunk (DB) row (optional) or buffer in memory to persist final text at the end
  • When provider finishes, backend updates the Message rows with final text and metadata (latency, cost, error) and emits a final SSE event.

NOTE: Real provider streaming requires provider SDK support for streaming. Example below shows a generalized approach: for OpenAI/other providers you will replace the adapter streaming implementation with the provider’s streaming API.

Database (Prisma) — apps/api/prisma/schema.prisma

generator client { provider = "prisma-client-js" }

datasource db {
  provider = "postgresql"
  url      = env("DATABASE_URL")
}

model User {
  id        String   @id @default(cuid())
  email     String?  @unique
  sessions  Session[]
  createdAt DateTime @default(now())
}

model Session {
  id         String    @id @default(cuid())
  user       User?     @relation(fields: [userId], references: [id])
  userId     String?
  title      String?
  messages   Message[]
  createdAt  DateTime  @default(now())
  updatedAt  DateTime  @updatedAt
}

model Message {
  id            String         @id @default(cuid())
  session       Session        @relation(fields: [sessionId], references: [id])
  sessionId     String
  role          String         // 'user' | 'assistant' | 'system'
  provider      String?        // e.g., 'openai' if this message is provider-generated
  content       String?        @db.Text
  partial       String?        @db.Text
  tokens        Int?
  costCents     Int?           // store micro-costs if required
  createdAt     DateTime       @default(now())
  updatedAt     DateTime       @updatedAt
  chunks        MessageChunk[]
}

model MessageChunk {
  id        String   @id @default(cuid())
  message   Message  @relation(fields: [messageId], references: [id])
  messageId String
  provider  String
  index     Int
  text      String   @db.Text
  createdAt DateTime @default(now())
}

Add DATABASE_URL to apps/api/.env:

DATABASE_URL=postgresql://USER:PASSWORD@localhost:5432/ai_aggregator_db

Run:

cd apps/api
pnpm prisma migrate dev --name add_chat_history
pnpm prisma generate

Backend: dependencies

Add to apps/api/package.json:

"prisma": "^5.0.0",
"@prisma/client": "^5.0.0"

Then pnpm install and run pnpm prisma generate (see above).

Backend: Prisma client initialization apps/api/src/db.ts

import { PrismaClient } from '@prisma/client';
export const db = new PrismaClient();

Backend: New streaming route with SSE apps/api/src/stream.ts

Create a new module that exposes setupStreamingRoutes(app) and mounts /api/stream.

Key points:

  • Use res.writeHead(200, { 'Content-Type': 'text/event-stream', 'Cache-Control': 'no-cache', Connection: 'keep-alive' })
  • Write SSE lines like res.write(event: chunk
    data: ${JSON.stringify(payload)}

);

  • Use res.flush?.() if available (Node 18+).
import express from 'express';
import { db } from './db';
import { adapters } from './providers';
import type { ProviderId } from './types';

export function setupStreamingRoutes(app: express.Express) {
  app.post('/api/stream', async (req, res) => {
    // Basic validation
    const { prompt, system, providers = ['openai'], sessionId, createSession } = req.body;
    if (!prompt) return res.status(400).json({ error: 'prompt required' });

    // Ensure session
    let session;
    if (sessionId) {
      session = await db.session.findUnique({ where: { id: sessionId } });
    }
    if (!session && createSession) {
      session = await db.session.create({ data: { title: prompt.slice(0, 120) } });
    }
    if (!session) return res.status(400).json({ error: 'sessionId missing and createSession not set' });

    // Insert user message (the prompt)
    const userMessage = await db.message.create({ data: { sessionId: session.id, role: 'user', content: prompt } });

    // Setup SSE
    res.setHeader('Content-Type', 'text/event-stream');
    res.setHeader('Cache-Control', 'no-cache');
    res.setHeader('Connection', 'keep-alive');
    res.flushHeaders?.();

    // Helper to send SSE event
    const sendEvent = (event: string, data: any) => {
      res.write(`event: ${event}
`);
      res.write(`data: ${JSON.stringify(data)}

`);
      res.flush?.();
    };

    // For each provider, create a Message placeholder for assistant
    const assistantMessages = await Promise.all((providers as ProviderId[]).map(async p => {
      return db.message.create({ data: { sessionId: session!.id, role: 'assistant', provider: p, partial: '' } });
    }));

    // For cleanup on client disconnect
    req.on('close', async () => {
      try { sendEvent('end', { reason: 'client_closed' }); } catch (e) {}
      res.end();
    });

    // Start provider streams in parallel
    providers.forEach(async (prov: ProviderId, idx) => {
      const message = assistantMessages[idx];
      try {
        // If adapter exposes streaming API, use it. Here we show a generic interface:
        const adapter = adapters[prov];
        if (!adapter) {
          sendEvent('error', { provider: prov, error: 'adapter_not_found' });
          await db.message.update({ where: { id: message.id }, data: { content: '', partial: '', updatedAt: new Date() } });
          return;
        }

        // `streamingAdapter` is a hypothetical helper in each provider file that accepts `onChunk` and `onDone` callbacks.
        // We'll show an example implementation for the openai adapter below.

        await adapter.stream(prompt, {
          onChunk: async (chunkText: string) => {
            // Append chunk to partial in DB (lightweight append)
            await db.message.update({ where: { id: message.id }, data: { partial: { push: chunkText } } as any }).catch(() => {});
            sendEvent('chunk', { provider: prov, chunk: chunkText, messageId: message.id });
          },
          onDone: async (finalText: string, meta: any) => {
            // Save final content & meta
            await db.message.update({ where: { id: message.id }, data: { content: finalText, partial: undefined, tokens: meta?.tokens, costCents: meta?.costCents } });
            sendEvent('done', { provider: prov, text: finalText, meta, messageId: message.id });
          },
          onError: async (err: any) => {
            await db.message.update({ where: { id: message.id }, data: { content: '', partial: undefined } });
            sendEvent('error', { provider: prov, error: String(err), messageId: message.id });
          }
        });

      } catch (err: any) {
        sendEvent('error', { provider: prov, error: String(err) });
      }
    });

    // Keep the connection open until all providers finish. Backend will call `res.end()` after final events, or the client disconnects.
  });
}

Note: In the snippet above we used await adapter.stream() which is a new optional function you should add to each provider adapter. If a provider doesn’t support streaming, you can emulate streaming by chunking the final response into N pieces and sending them with short setTimeouts — but this is only a UX shim.

Provider streaming adapter example (OpenAI) apps/api/src/providers/openai.ts

import OpenAI from 'openai';
const client = new OpenAI({ apiKey: process.env.OPENAI_API_KEY });

export async function openaiStream(prompt: string, callbacks: { onChunk: (s: string) => void; onDone: (s: string, meta?: any) => void; onError: (err: any) => void }) {
  try {
    const stream = await client.chat.completions.create({
      model: 'gpt-4o-mini',
      messages: [{ role: 'user', content: prompt }],
      temperature: 0.7,
      stream: true
    });

    let full = '';
    for await (const part of stream) {
      // depending on the SDK, part.choices[0].delta?.content or part.choices[0].message?.content
      const chunk = part.delta?.content ?? part.choices?.[0]?.delta?.content ?? '';
      if (chunk) {
        full += chunk;
        callbacks.onChunk(chunk);
      }
    }

    callbacks.onDone(full, { tokens: 0 });
  } catch (err) {
    callbacks.onError(err);
  }
}

If your provider SDK doesn’t support streaming via for await, you will need to use fetch and read the ReadableStream chunks — or fall back to the chunking shim.

Frontend: streaming client (Next.js) changes apps/web/app/page.tsx

  • Add a stream function that opens an EventSource-like connection using fetch and ReadableStream (EventSource can’t send body in POST, so we’ll use fetch and parse SSE from the response stream), or use the EventSource polyfill if you implement GET /sse?session=....

Below is a minimal client-side streaming reader using fetch + SSE parser:

async function streamResponse({ prompt, system, providers, sessionId }: { prompt: string; system?: string; providers: string[]; sessionId?: string }) {
  const resp = await fetch(`${apiBase}/api/stream`, {
    method: 'POST',
    headers: { 'Content-Type': 'application/json' },
    body: JSON.stringify({ prompt, system, providers, sessionId, createSession: !sessionId }),
  });
  if (!resp.ok) throw new Error('stream failed');

  const reader = resp.body!.getReader();
  const decoder = new TextDecoder();
  let buffer = '';

  while (true) {
    const { value, done } = await reader.read();
    if (done) break;
    buffer += decoder.decode(value, { stream: true });

    // SSE messages are separated by double newline. Parse them.
    let idx;
    while ((idx = buffer.indexOf('

')) !== -1) {
      const raw = buffer.slice(0, idx);
      buffer = buffer.slice(idx + 2);

      // Each chunk may have `event: <name>` and `data: <json>` lines. Extract them.
      const lines = raw.split(/
?
/);
      let event = 'message';
      let data = '';
      for (const line of lines) {
        if (line.startsWith('event:')) event = line.replace('event:', '').trim();
        if (line.startsWith('data:')) data += line.replace('data:', '').trim();
      }
      try {
        const parsed = JSON.parse(data);
        // handle events: 'chunk', 'done', 'error', 'end'
        if (event === 'chunk') {
          // update UI: append parsed.chunk to provider partial
        } else if (event === 'done') {
          // mark provider finished and store final text
        } else if (event === 'error') {
          // show provider error
        }
      } catch (e) {
        console.error('sse parse error', e, data);
      }
    }
  }
}

In the UI, you’ll maintain an object keyed by provider that stores partial, final, status etc. Render partial as it grows and replace with final on done.

Frontend: persist & fetch history

Add API endpoints to the backend to fetch sessions and messages:

  • GET /api/sessions — list sessions with last message preview.
  • GET /api/sessions/:id/messages — fetch messages for a session.

These are simple db.session.findMany() and db.message.findMany() calls (returning content, partial, provider, timestamps).

On the frontend, the Recent panel will call GET /api/sessions and GET /api/sessions/:id/messages to populate history. Clicking a session loads its messages into the chat UI. When streaming, use the sessionId to append new assistant messages to that session.

Example: GET /api/sessions route

app.get('/api/sessions', async (req, res) => {
  const sessions = await db.session.findMany({ include: { messages: { take: 1, orderBy: { createdAt: 'desc' } } }, orderBy: { updatedAt: 'desc' }, take: 50 });
  res.json({ sessions });
});

Notes & production considerations

  • Atomicity: Avoid frequent DB writes for every tiny token — instead buffer small chunks in memory and write every few hundred ms, or write only final content and save occasional partials. Frequent writes can overwhelm the DB under high concurrency.
  • Scaling SSE: SSE connections are HTTP long‑lived. Use a horizontal scale with sticky sessions or a pub/sub (Redis) system to broadcast streaming chunks to the correct client worker. Alternatively use WebSockets via a managed pub/sub like Pusher, Ably, or a WebSocket server behind a load balancer.
  • Costs & tokens: Save token counts per message. Compute costCents using your own pricing table. For multi‑tenant setups, enforce per‑user budgets server‑side.
  • Security: Authenticate API endpoints and validate session ownership before returning messages.
  • Provider failover: If one provider fails, continue streaming others and surface the error for that provider only.

What’s added to the repo (summary)

  • apps/api/prisma/schema.prisma (+ migrations) — Postgres schema for sessions/messages/chunks.
  • apps/api/src/db.ts — Prisma client.
  • apps/api/src/stream.ts — SSE-backed streaming endpoint.
  • apps/api/src/providers/* — optional stream / streaming helpers per provider (e.g., openaiStream).
  • apps/api/src/index.ts — mount setupStreamingRoutes(app) and ensure graceful shutdown.
  • apps/web UI changes — streamResponse() using fetch+ReadableStream SSE parser; UI to render partials and finalize messages; history panel wired to /api/sessions and /api/sessions/:id/messages.

8) Quick run checklist

  1. Install Prisma and dependencies: pnpm install in apps/api.
  2. Set DATABASE_URL in apps/api/.env.
  3. pnpm prisma migrate dev --name add_chat_history.
  4. Start API: pnpm --filter @app/api dev and Web: pnpm --filter @app/web dev.
  5. Open the frontend and ask a question using the Stream action.

I hope you will understand how to get this done and this is how you can start your own AI Aggregator model.