Skip to main content

Building an SSE Proxy Backend

This guide shows you how to build a Node.js server that proxies Server-Sent Events from Surflux to multiple client applications. The proxy provides connection pooling, event persistence, and automatic reconnection with state preservation.

Why Build a Proxy?

While connecting directly to Surflux SSE endpoints works for development and testing, production applications should use a backend proxy. Direct SSE subscriptions from frontend applications expose your API key, cannot handle server restarts gracefully, and create redundant connections when multiple users access your application.

A proxy server solves these issues by maintaining a single upstream connection to Surflux and broadcasting events to multiple downstream clients. The proxy persists the last event ID in a database, allowing it to resume exactly where it left off after restarts. Individual client disconnections don't affect the upstream connection or other clients, and you can add authentication, event filtering, or transformation logic before exposing the stream to your users.

Architecture Overview

The proxy maintains one connection to Surflux and broadcasts events to all connected clients. It persists the last event ID in SQLite, allowing resumption after server restarts.

Project Setup

Directory Structure

project/
├── api/
│ ├── server.js # Express server
│ ├── sse.js # SSE handling
│ ├── constants.js # Configuration
│ └── db/
│ └── index.js # Database operations
├── html/ # Static files (optional)
├── package.json
└── .env

Dependencies

{
"name": "sse-proxy",
"version": "1.0.0",
"type": "module",
"dependencies": {
"express": "^4.18.2",
"cors": "^2.8.5",
"eventsource": "^2.0.2",
"sqlite3": "^5.1.6"
}
}

Install dependencies:

npm install

Environment Configuration

Create .env file:

SURFLUX_API_URL=https://flux.surflux.dev
SURFLUX_API_KEY=your_api_key_here
PORT=8080

Implementation

1. Configuration

Create api/constants.js:

export const SURFLUX_API_URL = process.env.SURFLUX_API_URL;
export const SURFLUX_API_KEY = process.env.SURFLUX_API_KEY;

2. Database Layer

Create api/db/index.js for event persistence:

import sqlite3 from 'sqlite3';
import { promisify } from 'util';

const db = new sqlite3.Database('./api/db/demo.db');

const dbRun = promisify(db.run.bind(db));
const dbGet = promisify(db.get.bind(db));

export async function initDatabase() {
try {
await dbRun(`
CREATE TABLE IF NOT EXISTS sse_state (
id INTEGER PRIMARY KEY AUTOINCREMENT,
last_message_id TEXT,
endpoint TEXT NOT NULL,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
`);
console.log('Database initialized');
} catch (error) {
console.error('Error initializing database:', error);
}
}

export async function getSSEState(endpoint) {
try {
const result = await dbGet(
'SELECT * FROM sse_state WHERE endpoint = ? LIMIT 1',
[endpoint]
);

if (!result) {
await dbRun(
'INSERT INTO sse_state (endpoint, last_message_id) VALUES (?, ?)',
[endpoint, null]
);

return {
endpoint,
last_message_id: null,
};
}

return result;
} catch (error) {
console.error('Error getting SSE state:', error);
throw error;
}
}

export async function updateLastMessageId(messageId, endpoint = 'main') {
try {
await dbRun(
'UPDATE sse_state SET last_message_id = ?, updated_at = CURRENT_TIMESTAMP WHERE endpoint = ?',
[messageId, endpoint]
);
console.log(`Updated last message ID: ${messageId}`);
} catch (error) {
console.error('Error updating last message ID:', error);
throw error;
}
}

export { db };

Key Points:

  • Uses SQLite for simplicity (switch to PostgreSQL/MySQL for production)
  • Stores last event ID per endpoint
  • Automatically initializes state for new endpoints

3. SSE Handler

Create api/sse.js:

import EventSource from 'eventsource';
import { getSSEState, updateLastMessageId } from './db/index.js';
import { SURFLUX_API_KEY, SURFLUX_API_URL } from './constants.js';

export const EVENTS_KEY = 'events';

let upstreamConnection = null;
let clientConnections = new Set();

export async function handleSSE(req, res) {
console.log('New SSE client connected');

// Set SSE headers
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Cache-Control',
});

// Add client to connections
clientConnections.add(res);

// Ensure upstream connection
try {
await ensureUpstreamConnection();
} catch (error) {
console.error('Error establishing upstream connection:', error);
res.write(`data: {"type":"error","message":"Failed to connect to upstream"}\n\n`);
}

// Send initial connection message
res.write(`data: {"type":"connected","message":"SSE proxy connected"}\n\n`);

// Handle client disconnect
req.on('close', () => {
console.log('SSE client disconnected');
clientConnections.delete(res);

if (clientConnections.size === 0) {
setTimeout(() => {
if (clientConnections.size === 0 && upstreamConnection) {
console.log('No clients connected, closing upstream connection');
upstreamConnection.close();
upstreamConnection = null;
}
}, 1000);
}
});

req.on('error', () => {
clientConnections.delete(res);

if (clientConnections.size === 0) {
setTimeout(() => {
if (clientConnections.size === 0 && upstreamConnection) {
upstreamConnection.close();
upstreamConnection = null;
}
}, 1000);
}
});
}

async function ensureUpstreamConnection() {
if (upstreamConnection && upstreamConnection.readyState === EventSource.OPEN) {
return upstreamConnection;
}

if (upstreamConnection) {
upstreamConnection.close();
}

const state = await getSSEState(EVENTS_KEY);
let sseUrl = `${SURFLUX_API_URL}/events?api-key=${SURFLUX_API_KEY}`;

// Resume from last event ID if available
if (state.last_message_id) {
sseUrl += `&last-id=${encodeURIComponent(state.last_message_id)}`;
console.log(`Resuming SSE from message ID: ${state.last_message_id}`);
} else {
console.log('Starting fresh SSE connection');
}

upstreamConnection = new EventSource(sseUrl);

upstreamConnection.onopen = () => {
console.log('Upstream SSE connection opened');
};

upstreamConnection.onmessage = async (event) => {
// Broadcast to all connected clients
const message = `id: ${event.lastEventId || ''}\ndata: ${event.data}\n\n`;
clientConnections.forEach(client => {
try {
client.write(message);
} catch (error) {
console.error('Error writing to client:', error);
clientConnections.delete(client);
}
});

// Save the last message ID
if (event.lastEventId) {
await updateLastMessageId(event.lastEventId, EVENTS_KEY);
}
};

upstreamConnection.onerror = (error) => {
console.error('Upstream SSE connection error:', {
message: error.message,
status: error.status,
});

// Notify clients
const errorMessage = `data: {"type":"error","message":"Connection lost, reconnecting..."}\n\n`;
clientConnections.forEach(client => {
try {
client.write(errorMessage);
} catch (error) {
clientConnections.delete(client);
}
});

// Reconnect after 5 seconds
setTimeout(() => {
if (clientConnections.size > 0) {
console.log('Attempting to reconnect upstream SSE...');
ensureUpstreamConnection();
}
}, 5000);
};

return upstreamConnection;
}

// Status endpoint
export async function handleApiStatus(req, res) {
try {
const state = await getSSEState(EVENTS_KEY);
res.json({
status: 'ok',
upstreamConnection: upstreamConnection ? upstreamConnection.readyState : 'closed',
clientConnections: clientConnections.size,
lastMessageId: state.last_message_id,
lastUpdated: state.updated_at,
});
} catch (error) {
res.status(500).json({
status: 'error',
error: error.message,
});
}
}

// Graceful shutdown
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);

function shutdown() {
console.log('Shutting down, closing connections...');
if (upstreamConnection) {
upstreamConnection.close();
}
clientConnections.clear();
process.exit(0);
}

Key Features:

  • Single upstream connection shared across all clients
  • Automatic resumption using last-id parameter
  • Graceful error handling and reconnection
  • Client connection cleanup
  • Status endpoint for monitoring

4. Express Server

Create api/server.js:

import express from 'express';
import cors from 'cors';
import path from 'path';
import { fileURLToPath } from 'url';
import { initDatabase } from './db/index.js';
import { handleApiStatus, handleSSE } from './sse.js';

const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);

const app = express();
const PORT = process.env.PORT || 8080;

// Middleware
app.use(cors());
app.use(express.json());
app.use(express.static(path.join(__dirname, '../html')));

// Routes
app.get('/api/sse', handleSSE);
app.get('/api/status', handleApiStatus);

app.listen(PORT, async () => {
await initDatabase();

console.log(`Server running on http://localhost:${PORT}`);
console.log(`SSE proxy: http://localhost:${PORT}/api/sse`);
console.log(`Status: http://localhost:${PORT}/api/status`);
});

Running the Server

Start the proxy:

node api/server.js

Expected output:

Database initialized
Server running on http://localhost:8080
SSE proxy: http://localhost:8080/api/sse
Status: http://localhost:8080/api/status

Testing the Proxy

Connect a Client

const eventSource = new EventSource('http://localhost:8080/api/sse');

eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('Received event:', data);
};

eventSource.onerror = (error) => {
console.error('Connection error:', error);
};

Check Status

curl http://localhost:8080/api/status

Response:

{
"status": "ok",
"upstreamConnection": "OPEN",
"clientConnections": 2,
"lastMessageId": "1755091934020-0",
"lastUpdated": "2025-01-15T10:30:00"
}

Production Considerations

Database Upgrade

SQLite works well for development but can experience locking issues under high concurrency. For production deployments, use PostgreSQL or MySQL. Here's how to migrate the database layer:

import { Pool } from 'pg';

const pool = new Pool({
connectionString: process.env.DATABASE_URL,
});

export async function updateLastMessageId(messageId, endpoint) {
await pool.query(
'UPDATE sse_state SET last_message_id = $1, updated_at = NOW() WHERE endpoint = $2',
[messageId, endpoint]
);
}

Security

Add authentication to prevent unauthorized access to your SSE stream:

function authenticate(req, res, next) {
const apiKey = req.headers['x-api-key'];
if (!apiKey || apiKey !== process.env.CLIENT_API_KEY) {
return res.status(401).json({ error: 'Unauthorized' });
}
next();
}

app.get('/api/sse', authenticate, handleSSE);

For additional protection, implement rate limiting to prevent abuse:

import rateLimit from 'express-rate-limit';

const limiter = rateLimit({
windowMs: 15 * 60 * 1000,
max: 100,
});

app.use('/api/sse', limiter);

Monitoring

Track key metrics to monitor your proxy's health:

let metrics = {
eventsReceived: 0,
eventsBroadcast: 0,
errors: 0,
reconnections: 0,
};

app.get('/api/metrics', (req, res) => {
res.json(metrics);
});

Deployment

Deploy your proxy using Docker for containerized environments:

Create Dockerfile:

FROM node:18-alpine

WORKDIR /app

COPY package*.json ./
RUN npm ci --production

COPY . .

EXPOSE 8080

CMD ["node", "api/server.js"]

Build and run:

docker build -t sse-proxy .
docker run -p 8080:8080 --env-file .env sse-proxy

Troubleshooting

Connection Drops

If connections drop frequently, increase timeout:

upstreamConnection.onerror = (error) => {
// Increase retry delay
setTimeout(() => ensureUpstreamConnection(), 10000);
};

Memory Leaks

Monitor and limit event buffer size:

setInterval(() => {
console.log('Memory usage:', process.memoryUsage());
console.log('Client connections:', clientConnections.size);
console.log('Event buffer size:', eventBuffer.length);
}, 60000);

Database Lock

SQLite can lock under high concurrency. Use WAL mode:

await dbRun('PRAGMA journal_mode=WAL');

Or switch to PostgreSQL for production.

Summary

This SSE proxy implementation provides:

  • Reliability — Automatic reconnection with state preservation
  • Scalability — Single upstream connection serves multiple clients
  • Resilience — Event buffering during disconnections
  • Flexibility — Easy to extend with custom logic

The architecture can be adapted for various use cases, from simple event broadcasting to complex event processing pipelines.

Next Steps