Skip to main content

Node.js Client Example

This page provides a complete Node.js implementation for connecting to the 8x8 Event Streaming service.

Overview

The Node.js client example demonstrates:

  • WebSocket connection using the ws library
  • Authentication using X-API-Key
  • Message reading and payload decoding
  • Modern async/await syntax
  • Command-line argument parsing
  • Clean, pipe-friendly output for integration with tools like jq

Prerequisites

  • Node.js 14 or higher (includes required async/await and modern JavaScript support)
  • npm package manager
  • ws library (version 8.16.0 or higher)

Installation

Install Node.js

If you don't have Node.js 14+:

Using Homebrew:

brew install node

Or using nvm (recommended):

# Install nvm
curl -o- https://raw.githubusercontent.com/nvm-sh/nvm/v0.39.0/install.sh | bash

# Install latest LTS
nvm install --lts
nvm use --lts

Install Dependencies

Create a package.json file:

{
"name": "pulsar-simple-client",
"version": "1.0.0",
"description": "Simplified WebSocket client for Apache Pulsar",
"main": "pulsar-simple-client.js",
"bin": {
"pulsar-simple-client": "./pulsar-simple-client.js"
},
"dependencies": {
"ws": "^8.18.0"
},
"engines": {
"node": ">=14.0.0"
}
}

Then install dependencies:

npm install

Complete Example

#!/usr/bin/env node
/**
* Pulsar Simple Client - A simplified WebSocket client for Apache Pulsar
*/

const WebSocket = require('ws');
const { URL } = require('url');

/**
* Pulsar Simple Client class
*/
class PulsarSimpleClient {
constructor(options) {
this.host = options.host;
this.port = options.port;
this.tenant = options.tenant;
this.namespace = options.namespace;
this.topic = options.topic;
this.xApiKey = options.xApiKey;
}

/**
* Build the Pulsar WebSocket URL
*/
buildUrl() {
const baseUrl = `wss://${this.host}:${this.port}/ws/v2/reader/persistent/${this.tenant}/${this.namespace}/${this.topic}`;
const url = new URL(baseUrl);

// Add X-API-Key as query parameter if provided
if (this.xApiKey) {
url.searchParams.set('x-api-key', this.xApiKey);
}

return url.toString();
}

/**
* Get HTTP headers for authentication
*/
getHeaders() {
const headers = {};

// Add X-API-Key header if provided
if (this.xApiKey) {
headers['X-API-Key'] = this.xApiKey;
}

return headers;
}

/**
* Check if message is a control message (should not be acknowledged)
*/
isControlMessage(msg) {
return (msg.type && msg.type !== '') || (msg.endOfTopic && msg.endOfTopic !== '');
}

/**
* Parse Pulsar message and extract decoded payload
*/
extractPulsarPayload(data) {
try {
const msg = JSON.parse(data);
const payloadB64 = msg.payload || '';

// Decode base64 payload
const payloadBuffer = Buffer.from(payloadB64, 'base64');
const payload = payloadBuffer.toString('utf-8');

return { msg, payload };
} catch (err) {
console.error(`Error extracting payload: ${err.message}`, { stream: process.stderr });
return null;
}
}

/**
* Send an acknowledgment message for a received message
* This is REQUIRED for WebSocket readers to prevent backlog buildup and message delivery stoppage
*/
sendAck(ws, messageId) {
try {
const ackMsg = { messageId };
ws.send(JSON.stringify(ackMsg));
} catch (err) {
console.error(`Warning: Failed to send ack: ${err.message}`);
}
}

/**
* Connect to WebSocket and receive messages
*/
async connectAndReceive() {
const wsUrl = this.buildUrl();
const headers = this.getHeaders();

console.error(`Connecting to ${wsUrl}...`);

// Configure WebSocket options
const wsOptions = {
headers: headers,
};

const ws = new WebSocket(wsUrl, wsOptions);

// Handle connection open
ws.on('open', () => {
console.error('Successfully connected');
});

// Handle incoming messages
ws.on('message', (data) => {
const result = this.extractPulsarPayload(data.toString());
if (result) {
const { msg, payload } = result;

// Check if this is a control message (don't print or ack these)
if (this.isControlMessage(msg)) {
return;
}

// Print only the payload (suitable for piping)
console.log(payload);

// Send acknowledgment (required for WebSocket flow control)
this.sendAck(ws, msg.messageId);
}
});

// Handle errors
ws.on('error', (err) => {
console.error(`WebSocket error: ${err.message}`);
process.exit(1);
});

// Handle connection close
ws.on('close', (code, reason) => {
console.error(`Connection closed: ${code} ${reason}`);
process.exit(0);
});

// Handle process termination
process.on('SIGINT', () => {
console.error('Interrupted by user');
ws.close();
process.exit(0);
});

process.on('SIGTERM', () => {
console.error('Terminated');
ws.close();
process.exit(0);
});
}
}

/**
* Parse command-line arguments
*/
function parseArgs() {
const args = process.argv.slice(2);
// Example uses euw2 region. For other regions, see developer.8x8.com
const options = {
host: 'pulsar-ws-euw2.8x8.com',
port: 443,
namespace: 'event-v1',
topic: 'all',
};

for (let i = 0; i < args.length; i++) {
const arg = args[i];
const nextArg = args[i + 1];

switch (arg) {
case '--host':
options.host = nextArg;
i++;
break;
case '--port':
options.port = parseInt(nextArg, 10);
i++;
break;
case '--tenant':
options.tenant = nextArg;
i++;
break;
case '--namespace':
options.namespace = nextArg;
i++;
break;
case '--topic':
options.topic = nextArg;
i++;
break;
case '--x-api-key':
options.xApiKey = nextArg;
i++;
break;
}
}

// Validate required parameters
if (!options.tenant) {
console.error('Error: --tenant is required');
process.exit(1);
}

return options;
}

/**
* Main entry point
*/
async function main() {
try {
const options = parseArgs();
const client = new PulsarSimpleClient(options);
await client.connectAndReceive();
} catch (err) {
console.error(`Error: ${err.message}`);
process.exit(1);
}
}

// Run main function
if (require.main === module) {
main();
}

module.exports = { PulsarSimpleClient };

Running the Client

Basic Usage

node pulsar-simple-client.js \
--tenant YOUR_TENANT \
--x-api-key YOUR_API_KEY

Command-Line Options

OptionDescriptionDefaultRequired
--tenantYour 8x8 tenant name-Yes
--hostPulsar broker hostnamepulsar-ws-euw2.8x8.com (EUW2)No
--portPulsar broker port443No
--namespacePulsar namespaceevent-v1No
--topicTopic nameallNo
--x-api-keyAPI key for authentication-No
--insecureSkip TLS certificate verificationtrueNo

Output and Piping

The client outputs only the decoded message payload to stdout, with status messages going to stderr. This makes it perfect for piping:

Pretty-print with jq

node pulsar-simple-client.js --tenant YOUR_TENANT --x-api-key YOUR_KEY | jq .

Filter events

node pulsar-simple-client.js --tenant YOUR_TENANT --x-api-key YOUR_KEY | \
jq 'select(.eventType == "agent.login")'

Save to file

node pulsar-simple-client.js --tenant YOUR_TENANT --x-api-key YOUR_KEY > events.log

Search with grep

node pulsar-simple-client.js --tenant YOUR_TENANT --x-api-key YOUR_KEY | grep "error"

Key Features

Modern JavaScript

The Node.js client uses modern JavaScript features:

// Async/await
async function connectAndReceive(url, apiKey) {
// ...
}

// Arrow functions
ws.on('message', (data) => {
// ...
});

// Template literals
const url = `wss://${host}:${port}/ws/v2/reader/persistent/${tenant}/${namespace}/${topic}`;

Payload Decoding

Messages are decoded in two steps:

// 1. Parse Pulsar message JSON
const pulsarMsg = JSON.parse(data.toString());

// 2. Decode base64 payload
const payload = Buffer.from(pulsarMsg.payload, 'base64').toString('utf-8');

Message Acknowledgement

Acknowledgements are required to prevent message delivery stoppage:

const ackMsg = { messageId };
ws.send(JSON.stringify(ackMsg));

Control Message Filtering

Control messages (like end-of-topic markers) should not be processed:

if (msg.type || msg.endOfTopic) {
return;
}

Clean Output

  • stdout: Only decoded message payloads
  • stderr: Status messages and errors

This separation ensures piped output remains clean.

Graceful Shutdown

The client handles Ctrl+C gracefully:

process.on('SIGINT', () => {
console.error('Disconnecting...');
ws.close();
});

Dependencies

The client requires:

{
"dependencies": {
"ws": "^8.16.0"
}
}

Install with:

npm install

Making the Script Executable

On Unix-like systems:

chmod +x pulsar-simple-client.js

# Run without node prefix
./pulsar-simple-client.js --tenant YOUR_TENANT --x-api-key YOUR_KEY

Or install globally:

npm install -g .

# Run from anywhere
pulsar-simple-client --tenant YOUR_TENANT --x-api-key YOUR_KEY

Using npm Scripts

Add to package.json:

{
"scripts": {
"start": "node pulsar-simple-client.js"
}
}

Then run:

npm start -- --tenant YOUR_TENANT --x-api-key YOUR_KEY

Error Handling

The client handles common errors:

  • Connection errors: WebSocket connection failures
  • JSON parse errors: Malformed Pulsar messages
  • Base64 decode errors: Invalid payload encoding
  • SSL/TLS errors: Certificate verification issues
  • Acknowledgement failures: Issues sending acks (logged as warnings)

All errors are logged to stderr without affecting stdout output.

Troubleshooting

Module not found: 'ws'

Install dependencies:

npm install

SSL Certificate Errors

The --insecure flag is enabled by default for testing. For production:

node pulsar-simple-client.js --tenant YOUR_TENANT --x-api-key YOUR_KEY --insecure false

Connection Refused

  • Verify hostname and port
  • Check network connectivity
  • Ensure firewall allows outbound connections

Node.js Version Too Old

Check your Node.js version:

node --version

Requires Node.js 14 or higher.

Syntax Errors

If you see syntax errors, your Node.js version may be too old. Modern JavaScript features require Node.js 14+.

Comparison with Other Clients

FeatureNode.jsPythonGo
LanguageJavaScriptPythonGo
Async ModelAsync/awaitAsync/awaitGoroutines
Dependenciesws librarywebsockets librarygorilla/websocket
Build RequiredNoNoYes
Package Managernpmpipgo mod
Best ForBackend servicesData pipelinesHigh performance

Next Steps