Back to Insights

Implementing Real-time Analytics at Scale

πŸ“… January 20, 2024 β€’ ⏱️ 10 min read β€’ Cloud & DevOps

Introduction

Real-time analytics isn't just about speedβ€”it's about making data-driven decisions when they matter most. When building analytics for PinnKET (10K+ daily users), we needed to process transaction events, user interactions, and system metrics in real-time to provide immediate insights for event organizers.

This article walks through building a scalable real-time analytics system that processes millions of events per second, from architecture decisions to implementation details.

The Challenge

Traditional batch processing systems introduce latency that can make insights irrelevant. For PinnKET, event organizers needed:

System Architecture Overview

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Mobile │───▢│ API Gateway │───▢│ Event Stream β”‚ β”‚ Apps β”‚ β”‚ (Nginx) β”‚ β”‚ (Kafka) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β–Ό β”‚ Web │───▢│ Load β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Dashboard β”‚ β”‚ Balancer β”‚ β”‚ Stream β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ Processors β”‚ β”‚ (Kafka β”‚ β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Streams) β”‚ β”‚ Third-party│───▢│ Webhooks β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ Services β”‚ β”‚ Handler β”‚ β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β–Ό β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Time Series β”‚ β”‚ Database β”‚ β”‚ (InfluxDB + β”‚ β”‚ PostgreSQL) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜ β”‚ β–Ό β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β” β”‚ Analytics API β”‚ β”‚ (FastAPI + β”‚ β”‚ WebSockets) β”‚ β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜

Core Components

1. Event Ingestion with Kafka

Apache Kafka serves as our event streaming platform. Here's how we handle high-throughput ingestion:


# Kafka Producer Configuration (Python)
from kafka import KafkaProducer
import json

class EventProducer:
    def __init__(self):
        self.producer = KafkaProducer(
            bootstrap_servers=['kafka-cluster:9092'],
            value_serializer=lambda x: json.dumps(x).encode('utf-8'),
            batch_size=16384,  # Batch for better throughput
            linger_ms=10,      # Small delay for batching
            compression_type='gzip',
            retries=3,
            acks='1'  # Balance between performance and durability
        )
    
    async def send_event(self, event_type, data):
        event = {
            'timestamp': datetime.utcnow().isoformat(),
            'event_type': event_type,
            'data': data,
            'version': '1.0'
        }
        
        # Partition by user_id for ordered processing
        partition_key = str(data.get('user_id', 0))
        
        self.producer.send(
            f'events.{event_type}',
            value=event,
            key=partition_key.encode('utf-8')
        )
            

2. Stream Processing with Kafka Streams

Real-time aggregations and transformations happen in stream processors:


# Stream Processing Example (Java)
@Component
public class AnalyticsStreamProcessor {
    
    @Autowired
    public void processTicketSalesStream() {
        StreamsBuilder builder = new StreamsBuilder();
        
        // Real-time ticket sales aggregation
        KStream<String, TicketSaleEvent> salesStream = 
            builder.stream("events.ticket_sale");
        
        // Window aggregation - 1 minute tumbling windows
        salesStream
            .groupBy((key, value) -> value.getEventId())
            .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
            .aggregate(
                () -> new SalesAggregation(),
                (key, value, aggregate) -> {
                    aggregate.addSale(value.getAmount());
                    return aggregate;
                },
                Materialized.<String, SalesAggregation, WindowStore<Bytes, byte[]>>as("sales-store")
            )
            .toStream()
            .to("analytics.real_time_sales");
        
        // Fraud detection stream
        salesStream
            .filter((key, value) -> isSuspiciousTransaction(value))
            .to("alerts.fraud_detection");
    }
    
    private boolean isSuspiciousTransaction(TicketSaleEvent event) {
        return event.getAmount() > 10000 || 
               isHighVelocityUser(event.getUserId()) ||
               isUnusualLocation(event.getLocation());
    }
}
            

3. Time-Series Data Storage

We use a hybrid approach with InfluxDB for time-series data and PostgreSQL for relational analytics:


# InfluxDB Setup for Metrics
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS

class MetricsStore:
    def __init__(self):
        self.client = InfluxDBClient(
            url="http://influxdb:8086",
            token="your-token",
            org="your-org"
        )
        self.write_api = self.client.write_api(write_options=SYNCHRONOUS)
    
    def write_metric(self, measurement, tags, fields, timestamp=None):
        point = Point(measurement)
        
        for tag_key, tag_value in tags.items():
            point = point.tag(tag_key, tag_value)
        
        for field_key, field_value in fields.items():
            point = point.field(field_key, field_value)
        
        if timestamp:
            point = point.time(timestamp)
        
        self.write_api.write(bucket="analytics", record=point)

# Usage example
metrics_store = MetricsStore()
metrics_store.write_metric(
    measurement="ticket_sales",
    tags={"event_id": "12345", "category": "VIP"},
    fields={"count": 1, "revenue": 150.0}
)
            

Real-time Dashboard Implementation

WebSocket-based Updates

For real-time dashboard updates, we use WebSockets with FastAPI:


from fastapi import FastAPI, WebSocket
import asyncio
import json

app = FastAPI()

class AnalyticsWebSocket:
    def __init__(self):
        self.connections = {}
    
    async def connect(self, websocket: WebSocket, event_id: str):
        await websocket.accept()
        if event_id not in self.connections:
            self.connections[event_id] = []
        self.connections[event_id].append(websocket)
    
    async def disconnect(self, websocket: WebSocket, event_id: str):
        if event_id in self.connections:
            self.connections[event_id].remove(websocket)
    
    async def broadcast_update(self, event_id: str, data: dict):
        if event_id in self.connections:
            disconnected = []
            for connection in self.connections[event_id]:
                try:
                    await connection.send_text(json.dumps(data))
                except:
                    disconnected.append(connection)
            
            # Clean up disconnected clients
            for conn in disconnected:
                self.connections[event_id].remove(conn)

analytics_ws = AnalyticsWebSocket()

@app.websocket("/ws/analytics/{event_id}")
async def websocket_endpoint(websocket: WebSocket, event_id: str):
    await analytics_ws.connect(websocket, event_id)
    try:
        while True:
            await websocket.receive_text()  # Keep connection alive
    except:
        await analytics_ws.disconnect(websocket, event_id)
            

Performance Optimizations

1. Kafka Optimization

2. Database Optimization

Monitoring and Alerting

Key Metrics to Monitor

Alerting Strategy


# Prometheus Alerting Rules
groups:
  - name: analytics-alerts
    rules:
    - alert: HighKafkaLag
      expr: kafka_consumer_lag_sum > 10000
      for: 2m
      labels:
        severity: warning
      annotations:
        summary: "High Kafka consumer lag detected"
        
    - alert: ProcessingRate
      expr: rate(events_processed_total[5m]) < 1000
      for: 1m
      labels:
        severity: critical
      annotations:
        summary: "Event processing rate below threshold"
            

Deployment and Scaling

Kubernetes Deployment

Our analytics pipeline runs on Kubernetes with auto-scaling:


apiVersion: apps/v1
kind: Deployment
metadata:
  name: stream-processor
spec:
  replicas: 3
  selector:
    matchLabels:
      app: stream-processor
  template:
    spec:
      containers:
      - name: processor
        image: analytics/stream-processor:v1.0
        resources:
          requests:
            memory: "512Mi"
            cpu: "250m"
          limits:
            memory: "1Gi"
            cpu: "500m"
        env:
        - name: KAFKA_BROKERS
          value: "kafka-cluster:9092"
---
apiVersion: autoscaling/v2
kind: HorizontalPodAutoscaler
metadata:
  name: stream-processor-hpa
spec:
  scaleTargetRef:
    apiVersion: apps/v1
    kind: Deployment
    name: stream-processor
  minReplicas: 3
  maxReplicas: 10
  metrics:
  - type: Resource
    resource:
      name: cpu
      target:
        type: Utilization
        averageUtilization: 70
            

Real-World Results

Implementing this architecture for PinnKET resulted in:

Lessons Learned

What Worked Well

Challenges and Solutions

Next Steps and Future Improvements

Conclusion

Building real-time analytics systems requires careful consideration of data flow, processing requirements, and scalability needs. The combination of Kafka, stream processing, and modern databases provides a solid foundation for high-throughput, low-latency analytics.

The key is starting simple and scaling incrementally based on actual usage patterns and requirements. Monitor everything, plan for failures, and always optimize for your specific use case rather than theoretical perfection.