Node.js Client Example
This page provides a complete Node.js implementation for connecting to the 8x8 Event Streaming service.
Overview
The Node.js client example demonstrates:
- WebSocket connection using the
wslibrary - Authentication using X-API-Key
- Message reading and payload decoding
- Modern async/await syntax
- Command-line argument parsing
- Clean, pipe-friendly output for integration with tools like
jq
Prerequisites
- Node.js 14 or higher (includes required async/await and modern JavaScript support)
- npm package manager
- ws library (version 8.16.0 or higher)
Installation
Install Node.js
If you don't have Node.js 14+:
- macOS
- Ubuntu/Debian
- RedHat/CentOS/Fedora
- Windows
Using Homebrew:
brew install node
Or using nvm (recommended):
# Install nvm
curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.0/install.sh | bash
# Install latest LTS
nvm install --lts
nvm use --lts
Using NodeSource repository (recommended):
# Node.js 20.x LTS
curl -fsSL https://deb.nodesource.com/setup_20.x | sudo -E bash -
sudo apt-get install -y nodejs
# Node.js 20.x LTS
curl -fsSL https://rpm.nodesource.com/setup_20.x | sudo bash -
sudo dnf install -y nodejs
Download from nodejs.org or use chocolatey:
choco install nodejs
Or use nvm-windows:
nvm install lts
nvm use lts
Install Dependencies
Create a package.json file:
{
"name": "pulsar-simple-client",
"version": "1.0.0",
"description": "Simplified WebSocket client for Apache Pulsar",
"main": "pulsar-simple-client.js",
"bin": {
"pulsar-simple-client": "./pulsar-simple-client.js"
},
"dependencies": {
"ws": "^8.18.0"
},
"engines": {
"node": ">=14.0.0"
}
}
Then install dependencies:
npm install
Complete Example
#!/usr/bin/env node
/**
* Pulsar Simple Client - A simplified WebSocket client for Apache Pulsar
*/
const WebSocket = require('ws');
const { URL } = require('url');
/**
* Pulsar Simple Client class
*/
class PulsarSimpleClient {
constructor(options) {
this.host = options.host;
this.port = options.port;
this.tenant = options.tenant;
this.namespace = options.namespace;
this.topic = options.topic;
this.xApiKey = options.xApiKey;
}
/**
* Build the Pulsar WebSocket URL
*/
buildUrl() {
const baseUrl = `wss://${this.host}:${this.port}/ws/v2/reader/persistent/${this.tenant}/${this.namespace}/${this.topic}`;
const url = new URL(baseUrl);
// Add X-API-Key as query parameter if provided
if (this.xApiKey) {
url.searchParams.set('x-api-key', this.xApiKey);
}
return url.toString();
}
/**
* Get HTTP headers for authentication
*/
getHeaders() {
const headers = {};
// Add X-API-Key header if provided
if (this.xApiKey) {
headers['X-API-Key'] = this.xApiKey;
}
return headers;
}
/**
* Check if message is a control message (should not be acknowledged)
*/
isControlMessage(msg) {
return (msg.type && msg.type !== '') || (msg.endOfTopic && msg.endOfTopic !== '');
}
/**
* Parse Pulsar message and extract decoded payload
*/
extractPulsarPayload(data) {
try {
const msg = JSON.parse(data);
const payloadB64 = msg.payload || '';
// Decode base64 payload
const payloadBuffer = Buffer.from(payloadB64, 'base64');
const payload = payloadBuffer.toString('utf-8');
return { msg, payload };
} catch (err) {
console.error(`Error extracting payload: ${err.message}`, { stream: process.stderr });
return null;
}
}
/**
* Send an acknowledgment message for a received message
* This is REQUIRED for WebSocket readers to prevent backlog buildup and message delivery stoppage
*/
sendAck(ws, messageId) {
try {
const ackMsg = { messageId };
ws.send(JSON.stringify(ackMsg));
} catch (err) {
console.error(`Warning: Failed to send ack: ${err.message}`);
}
}
/**
* Connect to WebSocket and receive messages
*/
async connectAndReceive() {
const wsUrl = this.buildUrl();
const headers = this.getHeaders();
console.error(`Connecting to ${wsUrl}...`);
// Configure WebSocket options
const wsOptions = {
headers: headers,
};
const ws = new WebSocket(wsUrl, wsOptions);
// Handle connection open
ws.on('open', () => {
console.error('Successfully connected');
});
// Handle incoming messages
ws.on('message', (data) => {
const result = this.extractPulsarPayload(data.toString());
if (result) {
const { msg, payload } = result;
// Check if this is a control message (don't print or ack these)
if (this.isControlMessage(msg)) {
return;
}
// Print only the payload (suitable for piping)
console.log(payload);
// Send acknowledgment (required for WebSocket flow control)
this.sendAck(ws, msg.messageId);
}
});
// Handle errors
ws.on('error', (err) => {
console.error(`WebSocket error: ${err.message}`);
process.exit(1);
});
// Handle connection close
ws.on('close', (code, reason) => {
console.error(`Connection closed: ${code} ${reason}`);
process.exit(0);
});
// Handle process termination
process.on('SIGINT', () => {
console.error('Interrupted by user');
ws.close();
process.exit(0);
});
process.on('SIGTERM', () => {
console.error('Terminated');
ws.close();
process.exit(0);
});
}
}
/**
* Parse command-line arguments
*/
function parseArgs() {
const args = process.argv.slice(2);
// Example uses euw2 region. For other regions, see developer.8x8.com
const options = {
host: 'pulsar-ws-euw2.8x8.com',
port: 443,
namespace: 'event-v1',
topic: 'all',
};
for (let i = 0; i < args.length; i++) {
const arg = args[i];
const nextArg = args[i + 1];
switch (arg) {
case '--host':
options.host = nextArg;
i++;
break;
case '--port':
options.port = parseInt(nextArg, 10);
i++;
break;
case '--tenant':
options.tenant = nextArg;
i++;
break;
case '--namespace':
options.namespace = nextArg;
i++;
break;
case '--topic':
options.topic = nextArg;
i++;
break;
case '--x-api-key':
options.xApiKey = nextArg;
i++;
break;
}
}
// Validate required parameters
if (!options.tenant) {
console.error('Error: --tenant is required');
process.exit(1);
}
return options;
}
/**
* Main entry point
*/
async function main() {
try {
const options = parseArgs();
const client = new PulsarSimpleClient(options);
await client.connectAndReceive();
} catch (err) {
console.error(`Error: ${err.message}`);
process.exit(1);
}
}
// Run main function
if (require.main === module) {
main();
}
module.exports = { PulsarSimpleClient };
Running the Client
Basic Usage
node pulsar-simple-client.js \
--tenant YOUR_TENANT \
--x-api-key YOUR_API_KEY
Command-Line Options
| Option | Description | Default | Required |
|---|---|---|---|
--tenant | Your 8x8 tenant name | - | Yes |
--host | Pulsar broker hostname | pulsar-ws-euw2.8x8.com (EUW2) | No |
--port | Pulsar broker port | 443 | No |
--namespace | Pulsar namespace | event-v1 | No |
--topic | Topic name | all | No |
--x-api-key | API key for authentication | - | No |
--insecure | Skip TLS certificate verification | true | No |
Output and Piping
The client outputs only the decoded message payload to stdout, with status messages going to stderr. This makes it perfect for piping:
Pretty-print with jq
node pulsar-simple-client.js --tenant YOUR_TENANT --x-api-key YOUR_KEY | jq .
Filter events
node pulsar-simple-client.js --tenant YOUR_TENANT --x-api-key YOUR_KEY | \
jq 'select(.eventType == "agent.login")'
Save to file
node pulsar-simple-client.js --tenant YOUR_TENANT --x-api-key YOUR_KEY > events.log
Search with grep
node pulsar-simple-client.js --tenant YOUR_TENANT --x-api-key YOUR_KEY | grep "error"
Key Features
Modern JavaScript
The Node.js client uses modern JavaScript features:
// Async/await
async function connectAndReceive(url, apiKey) {
// ...
}
// Arrow functions
ws.on('message', (data) => {
// ...
});
// Template literals
const url = `wss://${host}:${port}/ws/v2/reader/persistent/${tenant}/${namespace}/${topic}`;
Payload Decoding
Messages are decoded in two steps:
// 1. Parse Pulsar message JSON
const pulsarMsg = JSON.parse(data.toString());
// 2. Decode base64 payload
const payload = Buffer.from(pulsarMsg.payload, 'base64').toString('utf-8');
Message Acknowledgement
Acknowledgements are required to prevent message delivery stoppage:
const ackMsg = { messageId };
ws.send(JSON.stringify(ackMsg));
Control Message Filtering
Control messages (like end-of-topic markers) should not be processed:
if (msg.type || msg.endOfTopic) {
return;
}
Clean Output
- stdout: Only decoded message payloads
- stderr: Status messages and errors
This separation ensures piped output remains clean.
Graceful Shutdown
The client handles Ctrl+C gracefully:
process.on('SIGINT', () => {
console.error('Disconnecting...');
ws.close();
});
Dependencies
The client requires:
{
"dependencies": {
"ws": "^8.16.0"
}
}
Install with:
npm install
Making the Script Executable
On Unix-like systems:
chmod +x pulsar-simple-client.js
# Run without node prefix
./pulsar-simple-client.js --tenant YOUR_TENANT --x-api-key YOUR_KEY
Or install globally:
npm install -g .
# Run from anywhere
pulsar-simple-client --tenant YOUR_TENANT --x-api-key YOUR_KEY
Using npm Scripts
Add to package.json:
{
"scripts": {
"start": "node pulsar-simple-client.js"
}
}
Then run:
npm start -- --tenant YOUR_TENANT --x-api-key YOUR_KEY
Error Handling
The client handles common errors:
- Connection errors: WebSocket connection failures
- JSON parse errors: Malformed Pulsar messages
- Base64 decode errors: Invalid payload encoding
- SSL/TLS errors: Certificate verification issues
- Acknowledgement failures: Issues sending acks (logged as warnings)
All errors are logged to stderr without affecting stdout output.
Troubleshooting
Module not found: 'ws'
Install dependencies:
npm install
SSL Certificate Errors
The --insecure flag is enabled by default for testing. For production:
node pulsar-simple-client.js --tenant YOUR_TENANT --x-api-key YOUR_KEY --insecure false
Connection Refused
- Verify hostname and port
- Check network connectivity
- Ensure firewall allows outbound connections
Node.js Version Too Old
Check your Node.js version:
node --version
Requires Node.js 14 or higher.
Syntax Errors
If you see syntax errors, your Node.js version may be too old. Modern JavaScript features require Node.js 14+.
Comparison with Other Clients
| Feature | Node.js | Python | Go |
|---|---|---|---|
| Language | JavaScript | Python | Go |
| Async Model | Async/await | Async/await | Goroutines |
| Dependencies | ws library | websockets library | gorilla/websocket |
| Build Required | No | No | Yes |
| Package Manager | npm | pip | go mod |
| Best For | Backend services | Data pipelines | High performance |
Next Steps
- Python Client Example - Python implementation
- Go Client Example - Go implementation
- Message Format - Understanding message structure
- Troubleshooting - Common issues and solutions