Skip to main content

Java Client Example

This page provides a complete Java implementation for connecting to the 8x8 Event Streaming service.

Overview

The Java client example demonstrates:

  • WebSocket connection using Java-WebSocket library
  • Authentication using X-API-Key
  • Message reading and payload decoding
  • Command-line argument parsing
  • Error handling with proper logging

Complete Example

package com._8x8.pulsar;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Data;
import org.java_websocket.client.WebSocketClient;
import org.java_websocket.handshake.ServerHandshake;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.net.URI;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;

/**
* Simplified WebSocket client for consuming 8x8 streaming events via Apache Pulsar.
* <p>
* This client connects to Pulsar's WebSocket Reader endpoint and outputs raw message payloads
* suitable for piping to tools like jq.
*/
public class SimpleClient {
private static final Logger log = LoggerFactory.getLogger(SimpleClient.class);
private static final ObjectMapper objectMapper = new ObjectMapper();

/**
* Pulsar message structure from WebSocket.
*/
@Data
static class PulsarMessage {
// Data message fields
@JsonProperty("messageId")
private String messageId;

@JsonProperty("payload")
private String payload; // Base64 encoded

@JsonProperty("properties")
private Map<String, String> properties = new HashMap<>();

@JsonProperty("publishTime")
private String publishTime;

@JsonProperty("redeliveryCount")
private int redeliveryCount;

// Control message fields
@JsonProperty("type")
private String type; // e.g., "isEndOfTopic"

@JsonProperty("endOfTopic")
private String endOfTopic; // "true" or "false"

/**
* Check if this is a control message (should not be acknowledged).
*/
public boolean isControlMessage() {
return (type != null && !type.isEmpty()) || (endOfTopic != null && !endOfTopic.isEmpty());
}

/**
* Decode the base64 payload.
*/
public byte[] decodePayload() {
return Base64.getDecoder().decode(payload);
}

/**
* Decode payload as UTF-8 string.
*/
public String decodePayloadAsString() {
return new String(decodePayload(), StandardCharsets.UTF_8);
}
}

/**
* Build Pulsar WebSocket URL.
*/
private static String buildUrl(String host, int port, String tenant, String namespace,
String topic, String xApiKey) {
try {
String baseUrl = String.format("wss://%s:%d/ws/v2/reader/persistent/%s/%s/%s",
host, port, tenant, namespace, topic);

if (xApiKey != null && !xApiKey.isEmpty()) {
baseUrl += "?x-api-key=" + URLEncoder.encode(xApiKey, StandardCharsets.UTF_8);
}

return baseUrl;
} catch (Exception e) {
throw new RuntimeException("Failed to build URL", e);
}
}

/**
* WebSocket client implementation.
*/
static class PulsarWebSocketClient extends WebSocketClient {
private final CountDownLatch latch = new CountDownLatch(1);
private final String xApiKey;

public PulsarWebSocketClient(URI serverUri, String xApiKey) {
super(serverUri);
this.xApiKey = xApiKey;

// Add X-API-Key header if provided
if (xApiKey != null && !xApiKey.isEmpty()) {
addHeader("X-API-Key", xApiKey);
}

// Set connection timeout
setConnectionLostTimeout(45);
}

@Override
public void onOpen(ServerHandshake handshake) {
log.info("Successfully connected");
}

@Override
public void onMessage(String message) {
try {
// Parse Pulsar message
PulsarMessage pulsarMsg = objectMapper.readValue(message, PulsarMessage.class);

// Check if this is a control message (don't print or ack these)
if (pulsarMsg.isControlMessage()) {
return;
}

// Decode and print only the payload
String payload = pulsarMsg.decodePayloadAsString();
System.out.println(payload);

// Send acknowledgment (required for WebSocket flow control)
sendAck(pulsarMsg.getMessageId());

} catch (Exception e) {
log.error("Error processing message: {}", e.getMessage());
}
}

/**
* Send an acknowledgment message for a received message.
* This is REQUIRED for WebSocket readers to prevent backlog buildup and message delivery stoppage.
*/
private void sendAck(String messageId) {
try {
Map<String, String> ackMsg = new HashMap<>();
ackMsg.put("messageId", messageId);
String ackJson = objectMapper.writeValueAsString(ackMsg);
send(ackJson);
} catch (Exception e) {
log.warn("Failed to send ack for messageId: {}", messageId, e);
}
}

@Override
public void onClose(int code, String reason, boolean remote) {
log.info("Connection closed: {} - {}", code, reason);
latch.countDown();
}

@Override
public void onError(Exception ex) {
log.error("WebSocket error: {}", ex.getMessage());
latch.countDown();
}

public void awaitClose() throws InterruptedException {
latch.await();
}
}

/**
* Parse command-line arguments.
*/
private static Map<String, String> parseArgs(String[] args) {
Map<String, String> params = new HashMap<>();

// Defaults
// Example uses euw2 region. For other regions, see developer.8x8.com
params.put("host", "pulsar-ws-euw2.8x8.com");
params.put("port", "443");
params.put("namespace", "event-v1");
params.put("topic", "all");

// Get API key from environment variable
String apiKey = System.getenv("PULSAR_API_KEY");
if (apiKey != null && !apiKey.isEmpty()) {
params.put("x-api-key", apiKey);
}

for (int i = 0; i < args.length; i++) {
String arg = args[i];

if (arg.startsWith("--") && i + 1 < args.length) {
String key = arg.substring(2);
String value = args[++i];
params.put(key, value);
}
}

return params;
}

/**
* Main entry point.
*/
public static void main(String[] args) {
PulsarWebSocketClient client = null;
try {
// Parse arguments
Map<String, String> params = parseArgs(args);

// Validate required parameters
if (!params.containsKey("tenant")) {
System.err.println("Error: --tenant is required");
System.exit(1);
}

// Build URL
String url = buildUrl(
params.get("host"),
Integer.parseInt(params.get("port")),
params.get("tenant"),
params.get("namespace"),
params.get("topic"),
params.get("x-api-key")
);

log.info("Connecting to WebSocket...");

// Create and connect WebSocket client
client = new PulsarWebSocketClient(
new URI(url),
params.get("x-api-key")
);

// Connect (blocking)
if (!client.connectBlocking()) {
log.error("Failed to connect to WebSocket");
System.exit(1);
}

// Wait for connection to close
client.awaitClose();

} catch (Exception e) {
log.error("Error: {}", e.getMessage(), e);
System.exit(1);
} finally {
if (client != null) {
try {
client.close();
} catch (Exception e) {
log.warn("Error closing WebSocket client: {}", e.getMessage());
}
}
}
}
}

Key Components

Message Structure

The PulsarMessage class uses Jackson annotations for JSON mapping:

@Data
static class PulsarMessage {
// Data message fields
@JsonProperty("messageId")
private String messageId;

@JsonProperty("payload")
private String payload; // Base64 encoded

@JsonProperty("properties")
private Map<String, String> properties;

@JsonProperty("publishTime")
private String publishTime;

@JsonProperty("redeliveryCount")
private int redeliveryCount;

// Control message fields
@JsonProperty("type")
private String type;

@JsonProperty("endOfTopic")
private String endOfTopic;
}

URL Construction

The URL is built with proper encoding:

String baseUrl = String.format("wss://%s:%d/ws/v2/reader/persistent/%s/%s/%s",
host, port, tenant, namespace, topic);

if (xApiKey != null && !xApiKey.isEmpty()) {
baseUrl += "?x-api-key=" + URLEncoder.encode(xApiKey, StandardCharsets.UTF_8);
}

Authentication

X-API-Key is added as a header during connection:

if (xApiKey != null && !xApiKey.isEmpty()) {
addHeader("X-API-Key", xApiKey);
}

Payload Decoding

The payload is decoded from base64:

public byte[] decodePayload() {
return Base64.getDecoder().decode(payload);
}

public String decodePayloadAsString() {
return new String(decodePayload(), StandardCharsets.UTF_8);
}

Message Acknowledgement

Acknowledgements are required to prevent message delivery stoppage:

Map<String, String> ackMsg = new HashMap<>();
ackMsg.put("messageId", messageId);
String ackJson = objectMapper.writeValueAsString(ackMsg);
send(ackJson);

Control Message Filtering

Control messages (like end-of-topic markers) should not be processed:

if (pulsarMsg.isControlMessage()) {
return;
}

Maven Dependencies

Add these dependencies to your pom.xml:

<dependencies>
<!-- WebSocket client -->
<dependency>
<groupId>org.java-websocket</groupId>
<artifactId>Java-WebSocket</artifactId>
<version>1.5.7</version>
</dependency>

<!-- JSON processing -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.18.1</version>
</dependency>

<!-- Lombok for reducing boilerplate -->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.36</version>
<scope>provided</scope>
</dependency>

<!-- Logging -->
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.5.12</version>
</dependency>
</dependencies>

Building and Running

Build with Maven

mvn clean package

Run the JAR

java -jar target/pulsar-simple-client.jar \
--tenant YOUR_TENANT \
--x-api-key YOUR_API_KEY

Using Environment Variables

export PULSAR_API_KEY=your-api-key

java -jar target/pulsar-simple-client.jar \
--tenant YOUR_TENANT

Configuration Options

OptionDescriptionDefault
--tenantYour 8x8 tenant name (required)-
--hostPulsar broker hostnamepulsar-ws-euw2.8x8.com (EUW2)
--portPulsar broker port443
--namespacePulsar namespaceevent-v1
--topicTopic nameall
--x-api-keyAPI key for authentication(from PULSAR_API_KEY env var)

Error Handling

The example includes error handling for:

  • Connection failures
  • Message parsing errors
  • Base64 decoding errors
  • WebSocket errors and disconnections
  • Acknowledgement send failures (logged as warnings)

Errors are logged using SLF4J with Logback.

Next Steps