Building an SSE Proxy Backend
This guide shows you how to build a Node.js server that proxies Server-Sent Events from Surflux to multiple client applications. The proxy provides connection pooling, event persistence, and automatic reconnection with state preservation.
Why Build a Proxy?
While connecting directly to Surflux SSE endpoints works for development and testing, production applications should use a backend proxy. Direct SSE subscriptions from frontend applications expose your API key, cannot handle server restarts gracefully, and create redundant connections when multiple users access your application.
A proxy server solves these issues by maintaining a single upstream connection to Surflux and broadcasting events to multiple downstream clients. The proxy persists the last event ID in a database, allowing it to resume exactly where it left off after restarts. Individual client disconnections don't affect the upstream connection or other clients, and you can add authentication, event filtering, or transformation logic before exposing the stream to your users.
Architecture Overview
The proxy maintains one connection to Surflux and broadcasts events to all connected clients. It persists the last event ID in SQLite, allowing resumption after server restarts.
Project Setup
Directory Structure
project/
├── api/
│ ├── server.js # Express server
│ ├── sse.js # SSE handling
│ ├── constants.js # Configuration
│ └── db/
│ └── index.js # Database operations
├── html/ # Static files (optional)
├── package.json
└── .env
Dependencies
{
"name": "sse-proxy",
"version": "1.0.0",
"type": "module",
"dependencies": {
"express": "^4.18.2",
"cors": "^2.8.5",
"eventsource": "^2.0.2",
"sqlite3": "^5.1.6"
}
}
Install dependencies:
npm install
Environment Configuration
Create .env file:
SURFLUX_API_URL=https://flux.surflux.dev
SURFLUX_API_KEY=your_api_key_here
PORT=8080
Implementation
1. Configuration
Create api/constants.js:
export const SURFLUX_API_URL = process.env.SURFLUX_API_URL;
export const SURFLUX_API_KEY = process.env.SURFLUX_API_KEY;
2. Database Layer
Create api/db/index.js for event persistence:
import sqlite3 from 'sqlite3';
import { promisify } from 'util';
const db = new sqlite3.Database('./api/db/demo.db');
const dbRun = promisify(db.run.bind(db));
const dbGet = promisify(db.get.bind(db));
export async function initDatabase() {
try {
await dbRun(`
CREATE TABLE IF NOT EXISTS sse_state (
id INTEGER PRIMARY KEY AUTOINCREMENT,
last_message_id TEXT,
endpoint TEXT NOT NULL,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP
)
`);
console.log('Database initialized');
} catch (error) {
console.error('Error initializing database:', error);
}
}
export async function getSSEState(endpoint) {
try {
const result = await dbGet(
'SELECT * FROM sse_state WHERE endpoint = ? LIMIT 1',
[endpoint]
);
if (!result) {
await dbRun(
'INSERT INTO sse_state (endpoint, last_message_id) VALUES (?, ?)',
[endpoint, null]
);
return {
endpoint,
last_message_id: null,
};
}
return result;
} catch (error) {
console.error('Error getting SSE state:', error);
throw error;
}
}
export async function updateLastMessageId(messageId, endpoint = 'main') {
try {
await dbRun(
'UPDATE sse_state SET last_message_id = ?, updated_at = CURRENT_TIMESTAMP WHERE endpoint = ?',
[messageId, endpoint]
);
console.log(`Updated last message ID: ${messageId}`);
} catch (error) {
console.error('Error updating last message ID:', error);
throw error;
}
}
export { db };
Key Points:
- Uses SQLite for simplicity (switch to PostgreSQL/MySQL for production)
- Stores last event ID per endpoint
- Automatically initializes state for new endpoints
3. SSE Handler
Create api/sse.js:
import EventSource from 'eventsource';
import { getSSEState, updateLastMessageId } from './db/index.js';
import { SURFLUX_API_KEY, SURFLUX_API_URL } from './constants.js';
export const EVENTS_KEY = 'events';
let upstreamConnection = null;
let clientConnections = new Set();
export async function handleSSE(req, res) {
console.log('New SSE client connected');
// Set SSE headers
res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Access-Control-Allow-Origin': '*',
'Access-Control-Allow-Headers': 'Cache-Control',
});
// Add client to connections
clientConnections.add(res);
// Ensure upstream connection
try {
await ensureUpstreamConnection();
} catch (error) {
console.error('Error establishing upstream connection:', error);
res.write(`data: {"type":"error","message":"Failed to connect to upstream"}\n\n`);
}
// Send initial connection message
res.write(`data: {"type":"connected","message":"SSE proxy connected"}\n\n`);
// Handle client disconnect
req.on('close', () => {
console.log('SSE client disconnected');
clientConnections.delete(res);
if (clientConnections.size === 0) {
setTimeout(() => {
if (clientConnections.size === 0 && upstreamConnection) {
console.log('No clients connected, closing upstream connection');
upstreamConnection.close();
upstreamConnection = null;
}
}, 1000);
}
});
req.on('error', () => {
clientConnections.delete(res);
if (clientConnections.size === 0) {
setTimeout(() => {
if (clientConnections.size === 0 && upstreamConnection) {
upstreamConnection.close();
upstreamConnection = null;
}
}, 1000);
}
});
}
async function ensureUpstreamConnection() {
if (upstreamConnection && upstreamConnection.readyState === EventSource.OPEN) {
return upstreamConnection;
}
if (upstreamConnection) {
upstreamConnection.close();
}
const state = await getSSEState(EVENTS_KEY);
let sseUrl = `${SURFLUX_API_URL}/events?api-key=${SURFLUX_API_KEY}`;
// Resume from last event ID if available
if (state.last_message_id) {
sseUrl += `&last-id=${encodeURIComponent(state.last_message_id)}`;
console.log(`Resuming SSE from message ID: ${state.last_message_id}`);
} else {
console.log('Starting fresh SSE connection');
}
upstreamConnection = new EventSource(sseUrl);
upstreamConnection.onopen = () => {
console.log('Upstream SSE connection opened');
};
upstreamConnection.onmessage = async (event) => {
// Broadcast to all connected clients
const message = `id: ${event.lastEventId || ''}\ndata: ${event.data}\n\n`;
clientConnections.forEach(client => {
try {
client.write(message);
} catch (error) {
console.error('Error writing to client:', error);
clientConnections.delete(client);
}
});
// Save the last message ID
if (event.lastEventId) {
await updateLastMessageId(event.lastEventId, EVENTS_KEY);
}
};
upstreamConnection.onerror = (error) => {
console.error('Upstream SSE connection error:', {
message: error.message,
status: error.status,
});
// Notify clients
const errorMessage = `data: {"type":"error","message":"Connection lost, reconnecting..."}\n\n`;
clientConnections.forEach(client => {
try {
client.write(errorMessage);
} catch (error) {
clientConnections.delete(client);
}
});
// Reconnect after 5 seconds
setTimeout(() => {
if (clientConnections.size > 0) {
console.log('Attempting to reconnect upstream SSE...');
ensureUpstreamConnection();
}
}, 5000);
};
return upstreamConnection;
}
// Status endpoint
export async function handleApiStatus(req, res) {
try {
const state = await getSSEState(EVENTS_KEY);
res.json({
status: 'ok',
upstreamConnection: upstreamConnection ? upstreamConnection.readyState : 'closed',
clientConnections: clientConnections.size,
lastMessageId: state.last_message_id,
lastUpdated: state.updated_at,
});
} catch (error) {
res.status(500).json({
status: 'error',
error: error.message,
});
}
}
// Graceful shutdown
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);
function shutdown() {
console.log('Shutting down, closing connections...');
if (upstreamConnection) {
upstreamConnection.close();
}
clientConnections.clear();
process.exit(0);
}
Key Features:
- Single upstream connection shared across all clients
- Automatic resumption using
last-idparameter - Graceful error handling and reconnection
- Client connection cleanup
- Status endpoint for monitoring
4. Express Server
Create api/server.js:
import express from 'express';
import cors from 'cors';
import path from 'path';
import { fileURLToPath } from 'url';
import { initDatabase } from './db/index.js';
import { handleApiStatus, handleSSE } from './sse.js';
const __filename = fileURLToPath(import.meta.url);
const __dirname = path.dirname(__filename);
const app = express();
const PORT = process.env.PORT || 8080;
// Middleware
app.use(cors());
app.use(express.json());
app.use(express.static(path.join(__dirname, '../html')));
// Routes
app.get('/api/sse', handleSSE);
app.get('/api/status', handleApiStatus);
app.listen(PORT, async () => {
await initDatabase();
console.log(`Server running on http://localhost:${PORT}`);
console.log(`SSE proxy: http://localhost:${PORT}/api/sse`);
console.log(`Status: http://localhost:${PORT}/api/status`);
});
Running the Server
Start the proxy:
node api/server.js
Expected output:
Database initialized
Server running on http://localhost:8080
SSE proxy: http://localhost:8080/api/sse
Status: http://localhost:8080/api/status
Testing the Proxy
Connect a Client
const eventSource = new EventSource('http://localhost:8080/api/sse');
eventSource.onmessage = (event) => {
const data = JSON.parse(event.data);
console.log('Received event:', data);
};
eventSource.onerror = (error) => {
console.error('Connection error:', error);
};
Check Status
curl http://localhost:8080/api/status
Response:
{
"status": "ok",
"upstreamConnection": "OPEN",
"clientConnections": 2,
"lastMessageId": "1755091934020-0",
"lastUpdated": "2025-01-15T10:30:00"
}
Production Considerations
Database Upgrade
SQLite works well for development but can experience locking issues under high concurrency. For production deployments, use PostgreSQL or MySQL. Here's how to migrate the database layer:
import { Pool } from 'pg';
const pool = new Pool({
connectionString: process.env.DATABASE_URL,
});
export async function updateLastMessageId(messageId, endpoint) {
await pool.query(
'UPDATE sse_state SET last_message_id = $1, updated_at = NOW() WHERE endpoint = $2',
[messageId, endpoint]
);
}
Security
Add authentication to prevent unauthorized access to your SSE stream:
function authenticate(req, res, next) {
const apiKey = req.headers['x-api-key'];
if (!apiKey || apiKey !== process.env.CLIENT_API_KEY) {
return res.status(401).json({ error: 'Unauthorized' });
}
next();
}
app.get('/api/sse', authenticate, handleSSE);
For additional protection, implement rate limiting to prevent abuse:
import rateLimit from 'express-rate-limit';
const limiter = rateLimit({
windowMs: 15 * 60 * 1000,
max: 100,
});
app.use('/api/sse', limiter);
Monitoring
Track key metrics to monitor your proxy's health:
let metrics = {
eventsReceived: 0,
eventsBroadcast: 0,
errors: 0,
reconnections: 0,
};
app.get('/api/metrics', (req, res) => {
res.json(metrics);
});
Deployment
Deploy your proxy using Docker for containerized environments:
Create Dockerfile:
FROM node:18-alpine
WORKDIR /app
COPY package*.json ./
RUN npm ci --production
COPY . .
EXPOSE 8080
CMD ["node", "api/server.js"]
Build and run:
docker build -t sse-proxy .
docker run -p 8080:8080 --env-file .env sse-proxy
Troubleshooting
Connection Drops
If connections drop frequently, increase timeout:
upstreamConnection.onerror = (error) => {
// Increase retry delay
setTimeout(() => ensureUpstreamConnection(), 10000);
};
Memory Leaks
Monitor and limit event buffer size:
setInterval(() => {
console.log('Memory usage:', process.memoryUsage());
console.log('Client connections:', clientConnections.size);
console.log('Event buffer size:', eventBuffer.length);
}, 60000);
Database Lock
SQLite can lock under high concurrency. Use WAL mode:
await dbRun('PRAGMA journal_mode=WAL');
Or switch to PostgreSQL for production.
Summary
This SSE proxy implementation provides:
- Reliability — Automatic reconnection with state preservation
- Scalability — Single upstream connection serves multiple clients
- Resilience — Event buffering during disconnections
- Flexibility — Easy to extend with custom logic
The architecture can be adapted for various use cases, from simple event broadcasting to complex event processing pipelines.
Next Steps
- What are Server-Sent Events? — Learn SSE fundamentals
- Package Events — Subscribe to smart contract events
- Deepbook Flux Stream — Stream trading data