Go Client Example
This page provides a complete Go implementation for connecting to the 8x8 Event Streaming service.
Overview
The Go client example demonstrates:
- WebSocket connection setup
- Authentication using X-API-Key
- Message reading and payload decoding
- Error handling and graceful shutdown
Complete Example
package main
import (
"encoding/base64"
"encoding/json"
"flag"
"fmt"
"log"
"net/http"
"net/url"
"time"
"github.com/gorilla/websocket"
)
// PulsarMessage represents a message received from Pulsar WebSocket
type PulsarMessage struct {
// Data message fields
MessageID string `json:"messageId"`
Payload string `json:"payload"` // Base64 encoded
Properties map[string]string `json:"properties,omitempty"`
PublishTime string `json:"publishTime,omitempty"`
RedeliveryCount int `json:"redeliveryCount,omitempty"`
// Control message fields
Type string `json:"type,omitempty"` // e.g., "isEndOfTopic"
EndOfTopic string `json:"endOfTopic,omitempty"` // "true" or "false"
}
// IsControlMessage returns true if this is a control message (should not be acknowledged)
func (m *PulsarMessage) IsControlMessage() bool {
return m.Type != "" || m.EndOfTopic != ""
}
// DecodePayload decodes the base64 payload
func (m *PulsarMessage) DecodePayload() ([]byte, error) {
return base64.StdEncoding.DecodeString(m.Payload)
}
// extractPulsarPayload parses a Pulsar message and extracts the decoded payload
func extractPulsarPayload(data []byte) (*PulsarMessage, []byte, error) {
var msg PulsarMessage
if err := json.Unmarshal(data, &msg); err != nil {
return nil, nil, fmt.Errorf("failed to parse Pulsar message: %w", err)
}
payload, err := msg.DecodePayload()
if err != nil {
return &msg, nil, fmt.Errorf("failed to decode payload: %w", err)
}
return &msg, payload, nil
}
// sendAck sends an acknowledgment message for a received message
// This is REQUIRED for WebSocket readers to prevent backlog buildup and message delivery stoppage
func sendAck(conn *websocket.Conn, messageID string) error {
ackMsg := map[string]string{"messageId": messageID}
ackJSON, err := json.Marshal(ackMsg)
if err != nil {
return fmt.Errorf("failed to marshal ack message: %w", err)
}
err = conn.WriteMessage(websocket.TextMessage, ackJSON)
if err != nil {
return fmt.Errorf("failed to send ack: %w", err)
}
return nil
}
// buildURL constructs the Pulsar WebSocket URL
func buildURL(host string, port int, tenant, namespace, topic, xAPIKey string) (string, error) {
baseURL := fmt.Sprintf("wss://%s:%d/ws/v2/reader/persistent/%s/%s/%s", host, port, tenant, namespace, topic)
u, err := url.Parse(baseURL)
if err != nil {
return "", fmt.Errorf("invalid URL: %w", err)
}
// Add X-API-Key as query parameter if provided
if xAPIKey != "" {
q := u.Query()
q.Set("x-api-key", xAPIKey)
u.RawQuery = q.Encode()
}
return u.String(), nil
}
// ConnectAndReceive connects to a WebSocket URL and receives messages
func ConnectAndReceive(wsURL string, xAPIKey string) error {
// Set up HTTP headers for authentication
headers := http.Header{}
// Add X-API-Key header if provided
if xAPIKey != "" {
headers.Set("X-API-Key", xAPIKey)
}
// Configure WebSocket dialer
dialer := websocket.Dialer{
HandshakeTimeout: 45 * time.Second,
}
// Connect to WebSocket
log.Printf("Connecting to WebSocket...")
conn, resp, err := dialer.Dial(wsURL, headers)
if err != nil {
if resp != nil {
return fmt.Errorf("failed to connect to WebSocket (status: %d): %w", resp.StatusCode, err)
}
return fmt.Errorf("failed to connect to WebSocket: %w", err)
}
defer conn.Close()
log.Printf("Successfully connected")
// Read messages continuously
for {
messageType, message, err := conn.ReadMessage()
if err != nil {
log.Printf("Error reading message: %v", err)
return err
}
switch messageType {
case websocket.TextMessage, websocket.BinaryMessage:
// Extract and decode Pulsar message payload
pulsarMsg, payload, err := extractPulsarPayload(message)
if err != nil {
log.Printf("Error extracting payload: %v", err)
continue
}
// Check if this is a control message (don't print or ack these)
if pulsarMsg.IsControlMessage() {
continue
}
// Print only the payload
fmt.Println(string(payload))
// Send acknowledgment (required for WebSocket flow control)
if err := sendAck(conn, pulsarMsg.MessageID); err != nil {
log.Printf("Warning: Failed to send ack: %v", err)
}
case websocket.CloseMessage:
log.Println("Received close message from server")
return nil
}
}
}
func main() {
// Connection parameters
// Default host: pulsar-ws-euw2.8x8.com (example uses euw2 region; for other regions see developer.8x8.com)
host := flag.String("host", "pulsar-ws-euw2.8x8.com", "Pulsar broker hostname")
port := flag.Int("port", 443, "Pulsar broker port")
tenant := flag.String("tenant", "", "Pulsar tenant name (required)")
namespace := flag.String("namespace", "event-v1", "Pulsar namespace")
topic := flag.String("topic", "all", "Pulsar topic name")
xAPIKey := flag.String("x-api-key", "", "X-API-Key header value")
flag.Parse()
// Validate required parameters
if *tenant == "" {
log.Fatal("Error: -tenant is required")
}
// Build full URL
fullURL, err := buildURL(*host, *port, *tenant, *namespace, *topic, *xAPIKey)
if err != nil {
log.Fatalf("Error building URL: %v", err)
}
// Connect and receive messages
if err := ConnectAndReceive(fullURL, *xAPIKey); err != nil {
log.Fatalf("Error: %v", err)
}
}
Key Components
Message Structure
The PulsarMessage struct maps to the JSON structure received from Pulsar:
type PulsarMessage struct {
MessageID string `json:"messageId"`
Payload string `json:"payload"` // Base64 encoded
Properties map[string]string `json:"properties,omitempty"`
PublishTime string `json:"publishTime,omitempty"`
RedeliveryCount int `json:"redeliveryCount,omitempty"`
}
URL Construction
The client builds the WebSocket URL from components:
baseURL := fmt.Sprintf("wss://%s:%d/ws/v2/reader/persistent/%s/%s/%s",
host, port, tenant, namespace, topic)
Authentication
X-API-Key is set both as a header and query parameter:
headers := http.Header{}
headers.Set("X-API-Key", xAPIKey)
// Also add to URL query params
q := u.Query()
q.Set("x-api-key", xAPIKey)
Payload Decoding
The payload is base64 encoded and must be decoded before use:
payload, err := base64.StdEncoding.DecodeString(pulsarMsg.Payload)
Message Acknowledgement
Acknowledgements are required to prevent message delivery stoppage:
ackMsg := map[string]string{"messageId": messageID}
ackJSON, _ := json.Marshal(ackMsg)
conn.WriteMessage(websocket.TextMessage, ackJSON)
Control Message Filtering
Control messages (like end-of-topic markers) should not be processed:
if pulsarMsg.IsControlMessage() {
continue
}
Running the Example
Prerequisites
go install github.com/gorilla/websocket@latest
Build and Run
# Build
go build -o pulsar-client main.go
# Run
./pulsar-client \
-tenant YOUR_TENANT \
-x-api-key YOUR_API_KEY
Using Environment Variables
export PULSAR_TENANT=your-tenant
export PULSAR_API_KEY=your-api-key
./pulsar-client \
-tenant $PULSAR_TENANT \
-x-api-key $PULSAR_API_KEY
Processing with jq
Pipe the output to jq for JSON processing:
# Pretty-print all events
./pulsar-client -tenant YOUR_TENANT -x-api-key YOUR_KEY | jq .
# Filter by event type
./pulsar-client -tenant YOUR_TENANT -x-api-key YOUR_KEY | \
jq 'select(.eventType == "agent.login")'
# Extract specific fields
./pulsar-client -tenant YOUR_TENANT -x-api-key YOUR_KEY | \
jq '{type: .eventType, time: .timestamp}'
Error Handling
The example includes error handling for:
- Invalid URL construction
- Connection failures with HTTP status codes
- WebSocket read errors
- JSON parsing errors
- Base64 decoding errors
- Acknowledgement send failures
Next Steps
- Java Client Example - See the same functionality in Java
- Message Format - Learn more about message structure
- Troubleshooting - Common issues and solutions