kepsilone.com

Projets / Pipeline de streaming crypto en temps réel
PythonApache KafkaWebSocketDocker ComposeTimescaleDBFastAPIGrafana

Pipeline de streaming crypto en temps réel

Ingestion live BTC/USDT — WebSocket · Kafka · TimescaleDB · FastAPI (en cours)

En cours Projet personnel 📅 Jan 2026 – en cours 👤 Seul
Contexte & Problématique

La plupart des projets data démarrent avec un fichier CSV ou une base existante. Ce projet part d’un flux live : les trades BTC/USDT de Binance, en temps réel, événement par événement.

L’objectif n’est pas de construire un système de trading. C’est de maîtriser l’architecture d’un pipeline de streaming de bout en bout — ingestion, transport, stockage, exposition, visualisation — en construisant chaque couche de façon isolée et documentée.

Chaque phase correspond à un composant réel utilisé en production dans les équipes data : WebSocket pour l’ingestion temps réel, Kafka comme bus de messages découplé, TimescaleDB pour les séries temporelles, FastAPI pour l’exposition, Grafana pour la visualisation.

Impact quantifiéFlux live BTC/USDT · 5 composants découplés · pipeline end-to-end en construction phase par phase
Objectifs
01Ingérer des trades en temps réel depuis l'API Binance via WebSocket avec Python async
02Transporter les événements sans perte via Apache Kafka, découplé du consommateur
03Stocker les séries temporelles dans TimescaleDB et les exposer via une API FastAPI
04Visualiser les prix en temps réel avec un dashboard Grafana
05Documenter chaque phase comme un support d'apprentissage structuré et rejouable
Architecture & Stack
Ingestion
Python 3.11 + asyncio + websockets Connexion WebSocket à l'API Binance (btcusdt@trade)
Transport
Apache Kafka 3.x + Zookeeper Images Docker Confluent 7.5.0
Infrastructure
Docker Compose Orchestration locale en une commande
Stockage
TimescaleDB (PostgreSQL + extension séries temporelles) Phase 3 — planifié
API
FastAPI — endpoints /trades, /stats, /latest Phase 4 — planifié
Visualisation
Grafana — dashboard chandeliers temps réel Phase 5 — planifié
Sources de Données
Binance WebSocket API API

Flux temps réel BTC/USDT — endpoint btcusdt@trade — ~1–2 events/s

Résultats & Impact
Pipeline producteur fonctionnel : ingestion live BTC/USDT → topic Kafka opérationnel en local
Infrastructure Docker Compose reproductible en une commande (docker-compose up -d)
Documentation d'apprentissage phase par phase (docs/apprentissage-phase1.md)
Architecture des Phases 3–5 définie et documentée dans le README
Projet open source, structuré comme référence pour apprendre le streaming data engineering
Méthodologie
Voir la méthodologie

Le projet est construit en 5 phases successives. Chaque phase ajoute un composant au pipeline existant sans modifier ce qui précède. Ce découpage est intentionnel : il force à comprendre chaque couche isolément avant de l’intégrer.

La Phase 1 connecte Python à Binance et affiche les trades en console. La Phase 2 introduit Kafka comme couche de transport. Les Phases 3 à 5 ajoutent stockage, API et visualisation. L’infrastructure Docker Compose évolue à chaque phase pour intégrer les nouveaux services.

01
Phase 1 — WebSocket → Console
Connexion à l'API Binance, parsing JSON, normalisation des champs (T, p, q → timestamp, prix, quantite), affichage en console. Validation que le flux arrive correctement.
02
Phase 2 — Intégration Kafka
Ajout du producteur Kafka, mise en place de l'infrastructure Docker (Zookeeper + broker), publication des trades normalisés dans le topic crypto-trades.
03
Phase 3 — Consommateur + TimescaleDB
Consommateur Python qui lit le topic Kafka et insère en base. Création du schéma avec hypertable TimescaleDB indexé sur le timestamp.
04
Phase 4 — API FastAPI
Endpoints /trades, /stats, /latest pour exposer les données stockées avec filtres temporels.
05
Phase 5 — Grafana + Docker Compose complet
Dashboard temps réel connecté à TimescaleDB, orchestration complète de tous les services dans un unique docker-compose.yml.
Challenges & Arbitrages
Challenges rencontrés
⚠ Noms de champs opaques dans l'API Binance

Problème : L'API Binance utilise des champs nommés T, p, q — sans documentation, ils sont illisibles. Propager ces noms dans Kafka contaminerait tous les consommateurs.

Solution : Normalisation des noms au niveau du producteur, avant l'entrée dans Kafka : T → timestamp, p → prix, q → quantite. Un seul point de modification si la source change.

⚠ Configuration KAFKA_ADVERTISED_LISTENERS

Problème : Kafka démarre correctement mais les clients ne peuvent pas publier. Le broker communique une adresse inconnue aux producers.

Solution : Séparation explicite de deux listeners dans Docker Compose — PLAINTEXT_INTERNAL pour la communication inter-containers, PLAINTEXT_LOCAL pour les connexions depuis le host (localhost:9092).

Arbitrages techniques
⚖ Kafka vs connexion directe producteur-consommateur

Kafka retenu malgré la complexité de setup. Une connexion directe couperait les données si le consommateur est indisponible et forcerait à modifier le producteur pour chaque nouveau consommateur. Kafka résout les deux problèmes avec un bus persistant.

⚖ TimescaleDB vs InfluxDB pour le stockage

TimescaleDB choisi pour rester dans l'écosystème PostgreSQL. SQL standard, extensions connues, pas de query language propriétaire. InfluxDB impose Flux ou InfluxQL — une dépendance supplémentaire à apprendre.

⚖ asyncio vs threading pour le WebSocket

asyncio utilisé car le WebSocket est I/O-bound — le programme passe la majorité du temps à attendre le prochain message. L'async permet de gérer plusieurs flux en parallèle sans bloquer le thread principal.

Code & Ressources
Voir les snippets
Producteur WebSocket → Kafka (Python async)python
import asyncio, json, websockets
from kafka import KafkaProducer
from datetime import datetime

URL = "wss://stream.binance.com:9443/ws/btcusdt@trade"

producer_kafka = KafkaProducer(
    bootstrap_servers="localhost:9092",
    value_serializer=lambda x: json.dumps(x).encode("utf-8")
)

async def connect():
    async with websockets.connect(URL) as ws:
        async for message in ws:
            data = json.loads(message)
            payload = {
                "timestamp": datetime.utcfromtimestamp(data["T"] / 1000).isoformat(),
                "prix": float(data["p"]),
                "quantite": float(data["q"])
            }
            producer_kafka.send("crypto-trades", value=payload)
            print(payload)

asyncio.run(connect())
Infrastructure Kafka locale (Docker Compose)yaml
version: "3.8"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.0
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    image: confluentinc/cp-kafka:7.5.0
    depends_on: [zookeeper]
    ports:
      - "9092:9092"
    environment:
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT_LOCAL://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT_LOCAL:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT_INTERNAL
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
Learnings
Le découplage est une décision d'architecture, pas de performance
Kafka n'est pas justifié ici par la charge — un seul producteur, un seul consommateur. Il est justifié par la flexibilité future : chaque nouveau composant (alerte, modèle ML, second dashboard) se branche sur le topic sans toucher au producteur.
Normaliser les données à la source réduit la dette technique
Reporter la normalisation au niveau du consommateur ou de l'API crée une dépendance à la structure de la source dans chaque composant. Normaliser dans le producteur isole ce couplage à un seul endroit.
L'async Python est une contrainte structurelle pour le streaming
Ce n'est pas une optimisation de performance — c'est la condition pour ne pas bloquer le programme entier pendant l'attente du prochain événement WebSocket. Sans async, un second flux ou un timeout simple rendrait le code ingérable.
Prochaines étapes
  • Implémenter le consommateur Python + insertion TimescaleDB (Phase 3)
  • Créer l'API FastAPI avec les endpoints /trades, /stats, /latest (Phase 4)
  • Ajouter Grafana et compléter le Docker Compose avec tous les services (Phase 5)
  • Publier l'article de projet associé sur kepsilone.com