Python Client Example
This page provides a complete Python implementation for connecting to the 8x8 Event Streaming service.
Overview
The Python client example demonstrates:
- Async/await WebSocket connection using the
websocketslibrary - Authentication using X-API-Key
- Message reading and payload decoding
- Command-line argument parsing with argparse
- Clean, pipe-friendly output for integration with tools like
jq
Prerequisites
- Python 3.7 or higher (includes required
asynciosupport) - pip package manager
- websockets library (version 12.0 or higher)
Installation
Install Python
If you don't have Python 3.7+:
- macOS
- Ubuntu/Debian
- RedHat/CentOS/Fedora
- Windows
# Using Homebrew
brew install python3
sudo apt update
sudo apt install python3 python3-pip python3-venv
sudo dnf install python3 python3-pip
Download from python.org or use chocolatey:
choco install python3
Set Up Virtual Environment (Recommended)
# Create virtual environment
python3 -m venv venv
# Activate virtual environment
# On macOS/Linux:
source venv/bin/activate
# On Windows:
venv\Scripts\activate
Install Dependencies
Create a requirements.txt file:
websockets>=12.0,<14.0
Then install:
pip install -r requirements.txt
Or install directly:
pip install 'websockets>=12.0,<14.0'
Complete Example
#!/usr/bin/env python3
"""
Pulsar Simple Client - A simplified WebSocket client for Apache Pulsar
"""
import argparse
import asyncio
import base64
import json
import logging
import sys
import traceback
from urllib.parse import urlencode, urlparse
import websockets
class PulsarSimpleClient:
"""Simple WebSocket client for Apache Pulsar reader endpoints"""
def __init__(self, host, port, tenant, namespace, topic, x_api_key=None):
self.host = host
self.port = port
self.tenant = tenant
self.namespace = namespace
self.topic = topic
self.x_api_key = x_api_key
def build_url(self):
"""Build the Pulsar WebSocket URL"""
base_url = f"wss://{self.host}:{self.port}/ws/v2/reader/persistent/{self.tenant}/{self.namespace}/{self.topic}"
# Add X-API-Key as query parameter if provided
if self.x_api_key:
query_params = {"x-api-key": self.x_api_key}
base_url = f"{base_url}?{urlencode(query_params)}"
return base_url
def get_headers(self):
"""Build HTTP headers for authentication"""
headers = {}
# Add X-API-Key header if provided
if self.x_api_key:
headers["X-API-Key"] = self.x_api_key
return headers
def is_control_message(self, msg):
"""Check if message is a control message (should not be acknowledged)"""
return msg.get("type") or msg.get("endOfTopic")
def extract_pulsar_payload(self, data):
"""Parse Pulsar message and extract decoded payload"""
try:
msg = json.loads(data)
payload_b64 = msg.get("payload", "")
# Decode base64 payload
payload_bytes = base64.b64decode(payload_b64)
payload = payload_bytes.decode("utf-8")
return msg, payload
except (json.JSONDecodeError, KeyError, base64.binascii.Error) as e:
logging.error(f"Error extracting payload: {e}")
return None, None
async def send_ack(self, websocket, message_id):
"""
Send an acknowledgment message for a received message.
This is REQUIRED for WebSocket readers to prevent backlog buildup and message delivery stoppage.
"""
try:
ack_msg = {"messageId": message_id}
await websocket.send(json.dumps(ack_msg))
except Exception as e:
logging.warning(f"Failed to send ack: {e}")
async def connect_and_receive(self):
"""Connect to WebSocket and receive messages"""
ws_url = self.build_url()
headers = self.get_headers()
logging.info(f"Connecting to {ws_url}...")
try:
async with websockets.connect(
ws_url,
extra_headers=headers,
ssl=True, # Use default SSL verification
ping_interval=20,
ping_timeout=10
) as websocket:
logging.info("Successfully connected")
# Read messages continuously
async for message in websocket:
msg, payload = self.extract_pulsar_payload(message)
if msg and payload:
# Check if this is a control message (don't print or ack these)
if self.is_control_message(msg):
continue
# Print only the payload (suitable for piping)
print(payload, flush=True)
# Send acknowledgment (required for WebSocket flow control)
await self.send_ack(websocket, msg.get("messageId"))
except websockets.exceptions.WebSocketException as e:
logging.error(f"WebSocket error: {type(e).__name__}: {e}")
logging.error(traceback.format_exc())
sys.exit(1)
except Exception as e:
logging.error(f"Error: {type(e).__name__}: {e}")
logging.error(traceback.format_exc())
sys.exit(1)
def main():
"""Main entry point"""
parser = argparse.ArgumentParser(
description="Simplified WebSocket client for Apache Pulsar"
)
# Connection parameters
# Example uses euw2 region. For other regions, see developer.8x8.com
parser.add_argument(
"--host",
default="pulsar-ws-euw2.8x8.com",
help="Pulsar broker hostname (default: pulsar-ws-euw2.8x8.com)"
)
parser.add_argument(
"--port",
type=int,
default=443,
help="Pulsar broker port (default: 443)"
)
parser.add_argument(
"--tenant",
required=True,
help="Pulsar tenant name (required)"
)
parser.add_argument(
"--namespace",
default="event-v1",
help="Pulsar namespace (default: event-v1)"
)
parser.add_argument(
"--topic",
default="all",
help="Pulsar topic name (default: all)"
)
parser.add_argument(
"--x-api-key",
help="X-API-Key header value"
)
args = parser.parse_args()
# Configure logging (to stderr so it doesn't interfere with piped output)
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
stream=sys.stderr
)
# Create client and connect
client = PulsarSimpleClient(
host=args.host,
port=args.port,
tenant=args.tenant,
namespace=args.namespace,
topic=args.topic,
x_api_key=args.x_api_key
)
# Run the async client
try:
asyncio.run(client.connect_and_receive())
except KeyboardInterrupt:
logging.info("Interrupted by user")
sys.exit(0)
if __name__ == "__main__":
main()
Running the Client
Basic Usage
python3 pulsar_simple_client.py \
--tenant YOUR_TENANT \
--x-api-key YOUR_API_KEY
Command-Line Options
| Option | Description | Default | Required |
|---|---|---|---|
--tenant | Your 8x8 tenant name | - | Yes |
--host | Pulsar broker hostname | pulsar-ws-euw2.8x8.com (EUW2) | No |
--port | Pulsar broker port | 443 | No |
--namespace | Pulsar namespace | event-v1 | No |
--topic | Topic name | all | No |
--x-api-key | API key for authentication | - | No |
--insecure | Skip TLS certificate verification | true | No |
Output and Piping
The client outputs only the decoded message payload to stdout, with status messages going to stderr. This makes it perfect for piping:
Pretty-print with jq
python3 pulsar_simple_client.py --tenant YOUR_TENANT --x-api-key YOUR_KEY | jq .
Filter events
python3 pulsar_simple_client.py --tenant YOUR_TENANT --x-api-key YOUR_KEY | \
jq 'select(.eventType == "agent.login")'
Save to file
python3 pulsar_simple_client.py --tenant YOUR_TENANT --x-api-key YOUR_KEY > events.log
Search with grep
python3 pulsar_simple_client.py --tenant YOUR_TENANT --x-api-key YOUR_KEY | grep "error"
Key Features
Async/Await Design
The Python client uses modern async/await syntax for efficient WebSocket handling:
async with websockets.connect(url, additional_headers=headers, ssl=ssl_context) as websocket:
async for message in websocket:
# Process message
pass
Payload Decoding
Messages are decoded in two steps:
# 1. Parse Pulsar message JSON
pulsar_msg = json.loads(message)
# 2. Decode base64 payload
payload = base64.b64decode(pulsar_msg['payload']).decode('utf-8')
Message Acknowledgement
Acknowledgements are required to prevent message delivery stoppage:
ack_msg = {"messageId": message_id}
await websocket.send(json.dumps(ack_msg))
Control Message Filtering
Control messages (like end-of-topic markers) should not be processed:
if msg.get("type") or msg.get("endOfTopic"):
continue
Clean Output
- stdout: Only decoded message payloads
- stderr: Status messages and errors
This separation ensures piped output remains clean.
Dependencies
The client requires:
websockets>=12.0
Install with:
pip install -r requirements.txt
Virtual Environment
Using a virtual environment is recommended to avoid dependency conflicts:
# Create and activate
python3 -m venv venv
source venv/bin/activate # macOS/Linux
# venv\Scripts\activate # Windows
# Install dependencies
pip install -r requirements.txt
# Run client
python3 pulsar_simple_client.py --tenant YOUR_TENANT --x-api-key YOUR_KEY
# Deactivate when done
deactivate
Making the Script Executable
chmod +x pulsar_simple_client.py
# Run without python3 prefix
./pulsar_simple_client.py --tenant YOUR_TENANT --x-api-key YOUR_KEY
Error Handling
The client handles common errors:
- Connection errors: WebSocket connection failures
- JSON parse errors: Malformed Pulsar messages
- Base64 decode errors: Invalid payload encoding
- SSL/TLS errors: Certificate verification issues
- Acknowledgement failures: Issues sending acks (logged as warnings)
All errors are logged to stderr without affecting stdout output.
Troubleshooting
ImportError: No module named 'websockets'
Install dependencies:
pip install -r requirements.txt
SSL Certificate Errors
The --insecure flag is enabled by default for testing. For production:
python3 pulsar_simple_client.py --tenant YOUR_TENANT --x-api-key YOUR_KEY --insecure False
Connection Refused
- Verify hostname and port
- Check network connectivity
- Ensure firewall allows outbound connections
Python Version Too Old
Check your Python version:
python3 --version
Requires Python 3.7 or higher.
Next Steps
- Node.js Client Example - JavaScript/Node.js implementation
- Go Client Example - Go implementation
- Message Format - Understanding message structure
- Troubleshooting - Common issues and solutions