Introduction
Real-time analytics isn't just about speedβit's about making data-driven decisions when they matter most. When building analytics for PinnKET (10K+ daily users), we needed to process transaction events, user interactions, and system metrics in real-time to provide immediate insights for event organizers.
This article walks through building a scalable real-time analytics system that processes millions of events per second, from architecture decisions to implementation details.
The Challenge
Traditional batch processing systems introduce latency that can make insights irrelevant. For PinnKET, event organizers needed:
- Live ticket sales monitoring during events
- Real-time fraud detection and prevention
- Instant capacity management alerts
- Live user engagement metrics
- Dynamic pricing optimization based on demand
System Architecture Overview
Core Components
1. Event Ingestion with Kafka
Apache Kafka serves as our event streaming platform. Here's how we handle high-throughput ingestion:
# Kafka Producer Configuration (Python)
from kafka import KafkaProducer
import json
class EventProducer:
def __init__(self):
self.producer = KafkaProducer(
bootstrap_servers=['kafka-cluster:9092'],
value_serializer=lambda x: json.dumps(x).encode('utf-8'),
batch_size=16384, # Batch for better throughput
linger_ms=10, # Small delay for batching
compression_type='gzip',
retries=3,
acks='1' # Balance between performance and durability
)
async def send_event(self, event_type, data):
event = {
'timestamp': datetime.utcnow().isoformat(),
'event_type': event_type,
'data': data,
'version': '1.0'
}
# Partition by user_id for ordered processing
partition_key = str(data.get('user_id', 0))
self.producer.send(
f'events.{event_type}',
value=event,
key=partition_key.encode('utf-8')
)
2. Stream Processing with Kafka Streams
Real-time aggregations and transformations happen in stream processors:
# Stream Processing Example (Java)
@Component
public class AnalyticsStreamProcessor {
@Autowired
public void processTicketSalesStream() {
StreamsBuilder builder = new StreamsBuilder();
// Real-time ticket sales aggregation
KStream<String, TicketSaleEvent> salesStream =
builder.stream("events.ticket_sale");
// Window aggregation - 1 minute tumbling windows
salesStream
.groupBy((key, value) -> value.getEventId())
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.aggregate(
() -> new SalesAggregation(),
(key, value, aggregate) -> {
aggregate.addSale(value.getAmount());
return aggregate;
},
Materialized.<String, SalesAggregation, WindowStore<Bytes, byte[]>>as("sales-store")
)
.toStream()
.to("analytics.real_time_sales");
// Fraud detection stream
salesStream
.filter((key, value) -> isSuspiciousTransaction(value))
.to("alerts.fraud_detection");
}
private boolean isSuspiciousTransaction(TicketSaleEvent event) {
return event.getAmount() > 10000 ||
isHighVelocityUser(event.getUserId()) ||
isUnusualLocation(event.getLocation());
}
}
3. Time-Series Data Storage
We use a hybrid approach with InfluxDB for time-series data and PostgreSQL for relational analytics:
# InfluxDB Setup for Metrics
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
class MetricsStore:
def __init__(self):
self.client = InfluxDBClient(
url="http://influxdb:8086",
token="your-token",
org="your-org"
)
self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
def write_metric(self, measurement, tags, fields, timestamp=None):
point = Point(measurement)
for tag_key, tag_value in tags.items():
point = point.tag(tag_key, tag_value)
for field_key, field_value in fields.items():
point = point.field(field_key, field_value)
if timestamp:
point = point.time(timestamp)
self.write_api.write(bucket="analytics", record=point)
# Usage example
metrics_store = MetricsStore()
metrics_store.write_metric(
measurement="ticket_sales",
tags={"event_id": "12345", "category": "VIP"},
fields={"count": 1, "revenue": 150.0}
)
Real-time Dashboard Implementation
WebSocket-based Updates
For real-time dashboard updates, we use WebSockets with FastAPI:
from fastapi import FastAPI, WebSocket
import asyncio
import json
app = FastAPI()
class AnalyticsWebSocket:
def __init__(self):
self.connections = {}
async def connect(self, websocket: WebSocket, event_id: str):
await websocket.accept()
if event_id not in self.connections:
self.connections[event_id] = []
self.connections[event_id].append(websocket)
async def disconnect(self, websocket: WebSocket, event_id: str):
if event_id in self.connections:
self.connections[event_id].remove(websocket)
async def broadcast_update(self, event_id: str, data: dict):
if event_id in self.connections:
disconnected = []
for connection in self.connections[event_id]:
try:
await connection.send_text(json.dumps(data))
except:
disconnected.append(connection)
# Clean up disconnected clients
for conn in disconnected:
self.connections[event_id].remove(conn)
analytics_ws = AnalyticsWebSocket()
@app.websocket("/ws/analytics/{event_id}")
async def websocket_endpoint(websocket: WebSocket, event_id: str):
await analytics_ws.connect(websocket, event_id)
try:
while True:
await websocket.receive_text() # Keep connection alive
except:
await analytics_ws.disconnect(websocket, event_id)
Performance Optimizations
1. Kafka Optimization
- Partitioning: Used user_id-based partitioning for ordered processing
- Batching: Configured producers for optimal batch sizes
- Compression: Enabled gzip compression for better throughput
- Replication: Set replication factor to 3 for fault tolerance
2. Database Optimization
- Indexes: Time-based and compound indexes for fast queries
- Retention Policies: Automated data lifecycle management
- Read Replicas: Separate read replicas for analytics queries
Monitoring and Alerting
Key Metrics to Monitor
- Kafka Lag: Consumer lag indicates processing delays
- Processing Rate: Events processed per second
- Error Rate: Failed event processing percentage
- Latency: End-to-end event processing time
- Storage: Database growth and query performance
Alerting Strategy
# Prometheus Alerting Rules
groups:
- name: analytics-alerts
rules:
- alert: HighKafkaLag
expr: kafka_consumer_lag_sum > 10000
for: 2m
labels:
severity: warning
annotations:
summary: "High Kafka consumer lag detected"
- alert: ProcessingRate
expr: rate(events_processed_total[5m]) < 1000
for: 1m
labels:
severity: critical
annotations:
summary: "Event processing rate below threshold"
Deployment and Scaling
Kubernetes Deployment
Our analytics pipeline runs on Kubernetes with auto-scaling:
apiVersion: apps/v1
kind: Deployment
metadata:
name: stream-processor
spec:
replicas: 3
selector:
matchLabels:
app: stream-processor
template:
spec:
containers:
- name: processor
image: analytics/stream-processor:v1.0
resources:
requests:
memory: "512Mi"
cpu: "250m"
limits:
memory: "1Gi"
cpu: "500m"
env:
- name: KAFKA_BROKERS
value: "kafka-cluster:9092"
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
name: stream-processor-hpa
spec:
scaleTargetRef:
apiVersion: apps/v1
kind: Deployment
name: stream-processor
minReplicas: 3
maxReplicas: 10
metrics:
- type: Resource
resource:
name: cpu
target:
type: Utilization
averageUtilization: 70
Real-World Results
Implementing this architecture for PinnKET resulted in:
- π 2M+ events/second processing capacity
- β‘ <100ms latency for real-time aggregations
- π§ 99.9% availability during peak events
- π° 40% cost reduction vs traditional solutions
- π 10x faster insights delivery to clients
Lessons Learned
What Worked Well
- Kafka's durability: Never lost events even during failures
- Stream processing: Real-time aggregations without complex infrastructure
- Hybrid storage: InfluxDB + PostgreSQL covered all use cases
- WebSocket updates: Instant dashboard updates improved user experience
Challenges and Solutions
- Backpressure: Implemented circuit breakers and queuing
- Hot partitions: Better partitioning strategy based on event distribution
- Schema evolution: Implemented schema registry for backward compatibility
Next Steps and Future Improvements
- Machine Learning: Real-time ML inference on event streams
- Multi-region: Cross-region replication for global availability
- Advanced Analytics: Complex event processing for pattern detection
- Edge Processing: Closer-to-source analytics for reduced latency
Conclusion
Building real-time analytics systems requires careful consideration of data flow, processing requirements, and scalability needs. The combination of Kafka, stream processing, and modern databases provides a solid foundation for high-throughput, low-latency analytics.
The key is starting simple and scaling incrementally based on actual usage patterns and requirements. Monitor everything, plan for failures, and always optimize for your specific use case rather than theoretical perfection.