Skip to main content

Python Client Example

This page provides a complete Python implementation for connecting to the 8x8 Event Streaming service.

Overview

The Python client example demonstrates:

  • Async/await WebSocket connection using the websockets library
  • Authentication using X-API-Key
  • Message reading and payload decoding
  • Command-line argument parsing with argparse
  • Clean, pipe-friendly output for integration with tools like jq

Prerequisites

  • Python 3.7 or higher (includes required asyncio support)
  • pip package manager
  • websockets library (version 12.0 or higher)

Installation

Install Python

If you don't have Python 3.7+:

# Using Homebrew
brew install python3
# Create virtual environment
python3 -m venv venv

# Activate virtual environment
# On macOS/Linux:
source venv/bin/activate

# On Windows:
venv\Scripts\activate

Install Dependencies

Create a requirements.txt file:

websockets>=12.0,<14.0

Then install:

pip install -r requirements.txt

Or install directly:

pip install 'websockets>=12.0,<14.0'

Complete Example

#!/usr/bin/env python3
"""
Pulsar Simple Client - A simplified WebSocket client for Apache Pulsar
"""

import argparse
import asyncio
import base64
import json
import logging
import sys
import traceback
from urllib.parse import urlencode, urlparse

import websockets


class PulsarSimpleClient:
"""Simple WebSocket client for Apache Pulsar reader endpoints"""

def __init__(self, host, port, tenant, namespace, topic, x_api_key=None):
self.host = host
self.port = port
self.tenant = tenant
self.namespace = namespace
self.topic = topic
self.x_api_key = x_api_key

def build_url(self):
"""Build the Pulsar WebSocket URL"""
base_url = f"wss://{self.host}:{self.port}/ws/v2/reader/persistent/{self.tenant}/{self.namespace}/{self.topic}"

# Add X-API-Key as query parameter if provided
if self.x_api_key:
query_params = {"x-api-key": self.x_api_key}
base_url = f"{base_url}?{urlencode(query_params)}"

return base_url

def get_headers(self):
"""Build HTTP headers for authentication"""
headers = {}

# Add X-API-Key header if provided
if self.x_api_key:
headers["X-API-Key"] = self.x_api_key

return headers

def is_control_message(self, msg):
"""Check if message is a control message (should not be acknowledged)"""
return msg.get("type") or msg.get("endOfTopic")

def extract_pulsar_payload(self, data):
"""Parse Pulsar message and extract decoded payload"""
try:
msg = json.loads(data)
payload_b64 = msg.get("payload", "")

# Decode base64 payload
payload_bytes = base64.b64decode(payload_b64)
payload = payload_bytes.decode("utf-8")

return msg, payload
except (json.JSONDecodeError, KeyError, base64.binascii.Error) as e:
logging.error(f"Error extracting payload: {e}")
return None, None

async def send_ack(self, websocket, message_id):
"""
Send an acknowledgment message for a received message.
This is REQUIRED for WebSocket readers to prevent backlog buildup and message delivery stoppage.
"""
try:
ack_msg = {"messageId": message_id}
await websocket.send(json.dumps(ack_msg))
except Exception as e:
logging.warning(f"Failed to send ack: {e}")

async def connect_and_receive(self):
"""Connect to WebSocket and receive messages"""
ws_url = self.build_url()
headers = self.get_headers()

logging.info(f"Connecting to {ws_url}...")

try:
async with websockets.connect(
ws_url,
extra_headers=headers,
ssl=True, # Use default SSL verification
ping_interval=20,
ping_timeout=10
) as websocket:
logging.info("Successfully connected")

# Read messages continuously
async for message in websocket:
msg, payload = self.extract_pulsar_payload(message)
if msg and payload:
# Check if this is a control message (don't print or ack these)
if self.is_control_message(msg):
continue

# Print only the payload (suitable for piping)
print(payload, flush=True)

# Send acknowledgment (required for WebSocket flow control)
await self.send_ack(websocket, msg.get("messageId"))

except websockets.exceptions.WebSocketException as e:
logging.error(f"WebSocket error: {type(e).__name__}: {e}")
logging.error(traceback.format_exc())
sys.exit(1)
except Exception as e:
logging.error(f"Error: {type(e).__name__}: {e}")
logging.error(traceback.format_exc())
sys.exit(1)


def main():
"""Main entry point"""
parser = argparse.ArgumentParser(
description="Simplified WebSocket client for Apache Pulsar"
)

# Connection parameters
# Example uses euw2 region. For other regions, see developer.8x8.com
parser.add_argument(
"--host",
default="pulsar-ws-euw2.8x8.com",
help="Pulsar broker hostname (default: pulsar-ws-euw2.8x8.com)"
)
parser.add_argument(
"--port",
type=int,
default=443,
help="Pulsar broker port (default: 443)"
)
parser.add_argument(
"--tenant",
required=True,
help="Pulsar tenant name (required)"
)
parser.add_argument(
"--namespace",
default="event-v1",
help="Pulsar namespace (default: event-v1)"
)
parser.add_argument(
"--topic",
default="all",
help="Pulsar topic name (default: all)"
)
parser.add_argument(
"--x-api-key",
help="X-API-Key header value"
)

args = parser.parse_args()

# Configure logging (to stderr so it doesn't interfere with piped output)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
stream=sys.stderr
)

# Create client and connect
client = PulsarSimpleClient(
host=args.host,
port=args.port,
tenant=args.tenant,
namespace=args.namespace,
topic=args.topic,
x_api_key=args.x_api_key
)

# Run the async client
try:
asyncio.run(client.connect_and_receive())
except KeyboardInterrupt:
logging.info("Interrupted by user")
sys.exit(0)


if __name__ == "__main__":
main()

Running the Client

Basic Usage

python3 pulsar_simple_client.py \
--tenant YOUR_TENANT \
--x-api-key YOUR_API_KEY

Command-Line Options

OptionDescriptionDefaultRequired
--tenantYour 8x8 tenant name-Yes
--hostPulsar broker hostnamepulsar-ws-euw2.8x8.com (EUW2)No
--portPulsar broker port443No
--namespacePulsar namespaceevent-v1No
--topicTopic nameallNo
--x-api-keyAPI key for authentication-No
--insecureSkip TLS certificate verificationtrueNo

Output and Piping

The client outputs only the decoded message payload to stdout, with status messages going to stderr. This makes it perfect for piping:

Pretty-print with jq

python3 pulsar_simple_client.py --tenant YOUR_TENANT --x-api-key YOUR_KEY | jq .

Filter events

python3 pulsar_simple_client.py --tenant YOUR_TENANT --x-api-key YOUR_KEY | \
jq 'select(.eventType == "agent.login")'

Save to file

python3 pulsar_simple_client.py --tenant YOUR_TENANT --x-api-key YOUR_KEY > events.log

Search with grep

python3 pulsar_simple_client.py --tenant YOUR_TENANT --x-api-key YOUR_KEY | grep "error"

Key Features

Async/Await Design

The Python client uses modern async/await syntax for efficient WebSocket handling:

async with websockets.connect(url, additional_headers=headers, ssl=ssl_context) as websocket:
async for message in websocket:
# Process message
pass

Payload Decoding

Messages are decoded in two steps:

# 1. Parse Pulsar message JSON
pulsar_msg = json.loads(message)

# 2. Decode base64 payload
payload = base64.b64decode(pulsar_msg['payload']).decode('utf-8')

Message Acknowledgement

Acknowledgements are required to prevent message delivery stoppage:

ack_msg = {"messageId": message_id}
await websocket.send(json.dumps(ack_msg))

Control Message Filtering

Control messages (like end-of-topic markers) should not be processed:

if msg.get("type") or msg.get("endOfTopic"):
continue

Clean Output

  • stdout: Only decoded message payloads
  • stderr: Status messages and errors

This separation ensures piped output remains clean.

Dependencies

The client requires:

websockets>=12.0

Install with:

pip install -r requirements.txt

Virtual Environment

Using a virtual environment is recommended to avoid dependency conflicts:

# Create and activate
python3 -m venv venv
source venv/bin/activate # macOS/Linux
# venv\Scripts\activate # Windows

# Install dependencies
pip install -r requirements.txt

# Run client
python3 pulsar_simple_client.py --tenant YOUR_TENANT --x-api-key YOUR_KEY

# Deactivate when done
deactivate

Making the Script Executable

chmod +x pulsar_simple_client.py

# Run without python3 prefix
./pulsar_simple_client.py --tenant YOUR_TENANT --x-api-key YOUR_KEY

Error Handling

The client handles common errors:

  • Connection errors: WebSocket connection failures
  • JSON parse errors: Malformed Pulsar messages
  • Base64 decode errors: Invalid payload encoding
  • SSL/TLS errors: Certificate verification issues
  • Acknowledgement failures: Issues sending acks (logged as warnings)

All errors are logged to stderr without affecting stdout output.

Troubleshooting

ImportError: No module named 'websockets'

Install dependencies:

pip install -r requirements.txt

SSL Certificate Errors

The --insecure flag is enabled by default for testing. For production:

python3 pulsar_simple_client.py --tenant YOUR_TENANT --x-api-key YOUR_KEY --insecure False

Connection Refused

  • Verify hostname and port
  • Check network connectivity
  • Ensure firewall allows outbound connections

Python Version Too Old

Check your Python version:

python3 --version

Requires Python 3.7 or higher.

Next Steps