โ€บ Python for Algo Trading
๐ŸŽ‰ Phase 2 Complete!
Lesson 17 ยท Phase 2 Finale

Building a Live Data Pipeline

Wire everything together โ€” session init, warm-up data, WebSocket ticks, candle assembly, indicator engine, signal engine, and trade logging into one production-ready pipeline.

55 min
250+ lines Python
Phase 2 ยท Lesson 8 of 8
Phase 2 Complete

One Pipeline to Rule Them All

Every lesson in Phase 2 taught you one piece. Today we assemble them into a single, coherent system that runs live during market hours. When the session ends you have a complete JSONL log of every signal your system generated โ€” ready for backtesting or review.

KiteConnect
Auth + Session
โ†’
Historical
Warm-up Data
โ†’
KiteTicker
WebSocket
โ†’
TickToCandle
OHLCV Builder
โ†’
Indicator
Engine
โ†’
Signal
Engine
โ†’
JSONL
Trade Log
What you'll build today

A single pipeline.py file (~250 lines) that is the entry point for your live trading session. No Jupyter, no notebooks โ€” pure production Python you can run from the terminal.

Project Structure

Keep your project clean from day one. A flat src/ layout works well for algo systems.

TREEproject layout
my_algo/
โ”œโ”€โ”€ config.json          # API keys  (never commit!)
โ”œโ”€โ”€ instruments.csv      # Downloaded once at session start
โ”œโ”€โ”€ cache/               # Historical data CSVs
โ”‚   โ”œโ”€โ”€ NIFTY_1min.csv
โ”‚   โ””โ”€โ”€ BANKNIFTY_1min.csv
โ”œโ”€โ”€ logs/
โ”‚   โ””โ”€โ”€ signals_2025-01-15.jsonl   # Daily trade log
โ”œโ”€โ”€ pipeline.py          # โ† main entry point  (this lesson)
โ”œโ”€โ”€ data_loader.py       # MarketDataLoader (L15)
โ”œโ”€โ”€ indicators.py        # sma/ema/rsi/vwap/macd/supertrend (L13)
โ”œโ”€โ”€ tick_to_candle.py    # TickToCandle (L16)
โ””โ”€โ”€ utils.py             # is_trading_time, logger, etc.
config.json must be in .gitignore

Your api_key and access_token are secret credentials. Add config.json and logs/ to .gitignore before your first commit.

Config Loading & Session Init

The pipeline starts by loading credentials and verifying the KiteConnect session. If the access token is expired the pipeline exits with a clear error message rather than silently failing later.

PYTHONpipeline.py โ€” imports & config
import json, time, logging, signal, sys
from datetime import datetime, date
from pathlib import Path
import pandas as pd
from kiteconnect import KiteConnect, KiteTicker

from data_loader import MarketDataLoader
from indicators  import sma, ema, rsi, vwap, macd, supertrend
from tick_to_candle import TickToCandle

# โ”€โ”€ Logging โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s [%(levelname)s] %(message)s",
    handlers=[
        logging.StreamHandler(sys.stdout),
        logging.FileHandler(f"logs/pipeline_{date.today()}.log")
    ]
)
log = logging.getLogger("pipeline")

# โ”€โ”€ Constants โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
NIFTY_TOKEN  = 256265
BN_TOKEN     = 260105
SUBSCRIBE    = [NIFTY_TOKEN, BN_TOKEN]

MARKET_OPEN  = "09:15"
MARKET_CLOSE = "15:30"
LAST_ENTRY   = "15:15"   # no new signals after this

# โ”€โ”€ Config โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
def load_config() -> dict:
    path = Path("config.json")
    if not path.exists():
        log.error("config.json not found โ€” run auth flow first")
        sys.exit(1)
    with path.open() as f:
        return json.load(f)

def init_kite(cfg: dict) -> KiteConnect:
    kite = KiteConnect(api_key=cfg["api_key"])
    kite.set_access_token(cfg["access_token"])
    try:
        profile = kite.profile()
        log.info(f"Session OK โ€” user: {profile['user_name']}")
    except Exception as e:
        log.error(f"Session invalid: {e}. Re-run auth flow.")
        sys.exit(1)
    return kite

Warm-Up Historical Data

Indicators like RSI(14) or SMA(20) need at least n completed candles before they produce valid values. We fetch historical data before market open so our first live candle already has valid indicator values.

PYTHONpipeline.py โ€” warm-up
# Minimum candles each indicator needs
WARMUP_CANDLES = {
    "rsi":        14,
    "ema_slow":   26,   # MACD slow EMA
    "supertrend": 10,   # ATR period
    "sma200":    200,
}
WARMUP_NEEDED = max(WARMUP_CANDLES.values()) + 20  # buffer

def warmup(kite: KiteConnect) -> dict[int, pd.DataFrame]:
    """Return {token: df} with warm-up OHLCV for each symbol."""
    loader = MarketDataLoader(kite, cache_dir="cache")
    symbols = {
        NIFTY_TOKEN: ("NSE", "NIFTY 50"),
        BN_TOKEN:    ("NSE", "NIFTY BANK"),
    }
    dfs = {}
    for token, (exchange, name) in symbols.items():
        log.info(f"Loading warm-up data for {name} ...")
        df = loader.load(
            instrument_token=token,
            interval="minute",
            candles_needed=WARMUP_NEEDED
        )
        # Pre-compute indicators on warm-up slice
        df = compute_indicators(df)
        dfs[token] = df
        log.info(f"  โ†’ {len(df)} candles loaded for {name}")
    return dfs
Cache-first loading

MarketDataLoader.load() checks cache/ first. On subsequent days the CSV is already there โ€” only the missing recent candles are fetched via the API. Cold start on day 1 takes ~10s; warm start takes under 1s.

Indicator Engine

The indicator engine takes a DataFrame of OHLCV candles and returns the same DataFrame enriched with all indicator columns. It works identically on warm-up historical data and on live candles โ€” same function, same output.

PYTHONpipeline.py โ€” indicators
def compute_indicators(df: pd.DataFrame) -> pd.DataFrame:
    """Add all indicator columns. Works on any-length DataFrame."""
    df = df.copy()
    c = df["close"]

    # Trend indicators
    df["sma20"]  = sma(c, 20)
    df["sma200"] = sma(c, 200)
    df["ema9"]   = ema(c, 9)

    # Momentum
    df["rsi14"]  = rsi(c, 14)

    # Volatility / trend direction
    st = supertrend(df, atr_period=10, multiplier=3.0)
    df["st_dir"] = st["direction"]   # +1 = bullish, -1 = bearish
    df["st_val"] = st["supertrend"]

    # Volume-weighted price
    df["vwap"]   = vwap(df)

    # MACD
    mc = macd(c)
    df["macd"]     = mc["macd"]
    df["macd_sig"] = mc["signal"]
    df["macd_hist"]= mc["histogram"]

    return df

Signal Engine

The signal engine reads the enriched DataFrame (latest row) and produces a structured signal dict. Every signal gets logged to JSONL for post-session review.

PYTHONpipeline.py โ€” signals
import numpy as np

def generate_signal(df: pd.DataFrame, symbol: str) -> dict | None:
    """
    Evaluate the latest candle against all conditions.
    Returns a signal dict or None if no trade.
    """
    if len(df) < 202:          # need enough history for SMA200
        return None

    row   = df.iloc[-1]
    prev  = df.iloc[-2]

    # โ”€โ”€ LONG conditions โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
    long_conditions = [
        row["close"]    >  row["sma200"],    # price above 200 SMA
        row["st_dir"]   == 1,               # supertrend bullish
        row["close"]    >  row["vwap"],      # above VWAP
        row["rsi14"]    >  55,               # momentum positive
        prev["macd_hist"] < 0 < row["macd_hist"],  # MACD crossover
    ]

    # โ”€โ”€ SHORT conditions โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
    short_conditions = [
        row["close"]    <  row["sma200"],
        row["st_dir"]   == -1,
        row["close"]    <  row["vwap"],
        row["rsi14"]    <  45,
        prev["macd_hist"] > 0 > row["macd_hist"],
    ]

    # Require ALL conditions to be met (strict filter)
    direction = None
    if all(long_conditions):
        direction = "LONG"
    elif all(short_conditions):
        direction = "SHORT"
    else:
        return None

    # ATR-based stop & target
    atr   = df["high"].rolling(14).max().iloc[-1] - \
            df["low"].rolling(14).min().iloc[-1]
    entry = row["close"]
    sl    = entry - 1.5 * atr  if direction == "LONG" else entry + 1.5 * atr
    tgt   = entry + 3.0 * atr  if direction == "LONG" else entry - 3.0 * atr

    return {
        "ts":        row.name.isoformat(),
        "symbol":    symbol,
        "direction": direction,
        "entry":     round(entry, 2),
        "sl":        round(sl,    2),
        "target":    round(tgt,   2),
        "rsi14":     round(row["rsi14"], 1),
        "st_dir":    int(row["st_dir"]),
        "conditions_met": sum(long_conditions if direction=="LONG" else short_conditions),
    }

Trade Logger (JSONL)

JSONL (JSON Lines) is the ideal format for trade logs โ€” each line is a valid JSON object, easy to append, easy to query with pandas later. One file per day.

PYTHONpipeline.py โ€” logger
class TradeLogger:
    def __init__(self):
        Path("logs").mkdir(exist_ok=True)
        self.path = Path(f"logs/signals_{date.today()}.jsonl")
        log.info(f"Trade log โ†’ {self.path}")

    def write(self, signal: dict):
        with self.path.open("a") as f:
            f.write(json.dumps(signal) + "\n")
        log.info(
            f"SIGNAL โ”‚ {signal['symbol']:12s} โ”‚ {signal['direction']:5s} โ”‚ "
            f"entry={signal['entry']} โ”‚ sl={signal['sl']} โ”‚ tgt={signal['target']}"
        )

# Load the day's log back for analysis:
def load_signals(path: str) -> pd.DataFrame:
    records = []
    with open(path) as f:
        for line in f:
            records.append(json.loads(line.strip()))
    df = pd.DataFrame(records)
    df["ts"] = pd.to_datetime(df["ts"])
    return df.set_index("ts")
ColumnTypeDescription
tsISO datetimeCandle close timestamp
symbolstringNIFTY 50 or NIFTY BANK
directionLONG / SHORTTrade direction
entryfloatClose price at signal
slfloatStop-loss (1.5ร— ATR)
targetfloatTarget (3ร— ATR = 2:1 R:R)
rsi14floatRSI at signal candle
conditions_metint (0-5)Debug: how many filters fired

Candle Callback & Main Loop

When a new 1-minute candle closes, TickToCandle fires on_candle_close(). We append the candle to our warm-up DataFrame, recompute indicators on the tail, run the signal engine, and log any signal โ€” all in under 10ms.

PYTHONpipeline.py โ€” callback & WebSocket
def is_trading_time() -> bool:
    now = datetime.now().strftime("%H:%M")
    return MARKET_OPEN <= now <= MARKET_CLOSE

def new_entries_allowed() -> bool:
    now = datetime.now().strftime("%H:%M")
    return MARKET_OPEN <= now <= LAST_ENTRY


class Pipeline:
    def __init__(self, kite: KiteConnect):
        self.kite    = kite
        self.logger  = TradeLogger()
        self.buffers : dict[int, pd.DataFrame] = {}    # token โ†’ live df
        self.names   = {NIFTY_TOKEN: "NIFTY 50", BN_TOKEN: "NIFTY BANK"}

        # Pre-load warm-up data
        self.buffers = warmup(kite)

        # One TickToCandle builder per symbol
        self.builders = {
            token: TickToCandle(
                interval_minutes=1,
                on_candle_close=lambda candle, t=token: self.on_candle(t, candle)
            )
            for token in SUBSCRIBE
        }

    def on_candle(self, token: int, candle: dict):
        """Called each time a 1-min candle closes."""
        if not is_trading_time():
            return

        symbol = self.names[token]

        # 1. Append new candle to buffer
        row = pd.DataFrame([candle]).set_index("datetime")
        self.buffers[token] = pd.concat(
            [self.buffers[token], row]
        ).tail(500)  # keep last 500 candles (โ‰ˆ 8 trading days)

        # 2. Recompute indicators on tail only (fast)
        df = compute_indicators(self.buffers[token])
        self.buffers[token] = df

        # 3. Signal engine
        if new_entries_allowed():
            sig = generate_signal(df, symbol)
            if sig:
                self.logger.write(sig)

    def start(self):
        """Connect WebSocket and block until session ends."""
        kws = KiteTicker(
            api_key=cfg["api_key"],
            access_token=cfg["access_token"]
        )

        def on_ticks(ws, ticks):
            for tick in ticks:
                token = tick["instrument_token"]
                if token in self.builders:
                    self.builders[token].on_tick(
                        ltp=tick["last_price"],
                        volume=tick.get("volume", 0),
                        timestamp=tick["exchange_timestamp"]
                    )

        def on_connect(ws, response):
            log.info("WebSocket connected โ€” subscribing ...")
            ws.subscribe(SUBSCRIBE)
            ws.set_mode(ws.MODE_FULL, SUBSCRIBE)

        def on_close(ws, code, reason):
            log.warning(f"WebSocket closed: {code} {reason}")

        def on_error(ws, code, reason):
            log.error(f"WebSocket error: {code} {reason}")

        kws.on_ticks   = on_ticks
        kws.on_connect = on_connect
        kws.on_close   = on_close
        kws.on_error   = on_error

        log.info("Starting WebSocket (threaded) ...")
        kws.connect(threaded=True)

        # Block main thread until market close
        while is_trading_time():
            time.sleep(30)

        log.info("Market closed โ€” stopping pipeline.")
        kws.close()

Entry Point & Graceful Shutdown

The main() block ties everything together. We also register a SIGINT handler so Ctrl+C logs a clean shutdown message rather than throwing a traceback.

PYTHONpipeline.py โ€” main()
# โ”€โ”€ Graceful shutdown โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€โ”€
_running = True

def _handle_sigint(sig, frame):
    global _running
    log.info("Interrupt received โ€” shutting down cleanly ...")
    _running = False

signal.signal(signal.SIGINT, _handle_sigint)


def main():
    log.info("โ•โ•โ• Vianmax Pipeline โ”€ starting โ•โ•โ•")

    cfg   = load_config()
    kite  = init_kite(cfg)

    # Wait for market open if called early
    now = datetime.now().strftime("%H:%M")
    if now < MARKET_OPEN:
        wait_sec = (
            datetime.strptime(MARKET_OPEN, "%H:%M") -
            datetime.strptime(now, "%H:%M")
        ).seconds
        log.info(f"Market opens at {MARKET_OPEN} โ€” waiting {wait_sec//60}m ...")
        time.sleep(wait_sec)

    pipeline = Pipeline(kite)
    pipeline.start()

    log.info("โ•โ•โ• Pipeline finished. Review logs/ for signals. โ•โ•โ•")


if __name__ == "__main__":
    main()
Run it

python pipeline.py โ€” the pipeline loads warm-up data, waits for market open, connects WebSocket, and starts logging signals. Signals appear in logs/signals_YYYY-MM-DD.jsonl.

Post-Session Signal Analysis

After the session ends, review the day's signals in a Jupyter notebook or script.

PYTHONreview.py
import pandas as pd, json

def load_signals(jsonl_path: str) -> pd.DataFrame:
    records = [json.loads(l) for l in open(jsonl_path)]
    df = pd.DataFrame(records)
    df["ts"] = pd.to_datetime(df["ts"])
    return df.set_index("ts")

df = load_signals("logs/signals_2025-01-15.jsonl")

# How many signals per symbol?
print(df.groupby(["symbol", "direction"]).size())

# Average R:R ratio
df["risk"]   = (df["entry"] - df["sl"]).abs()
df["reward"] = (df["target"] - df["entry"]).abs()
df["rr"]     = df["reward"] / df["risk"]
print(f"Avg R:R = {df['rr'].mean():.2f}")

# Signal timing histogram
df["hour"] = df.index.hour
df.groupby("hour").size().plot(kind="bar", title="Signals by Hour")

What Each File Does

FileResponsibilityLearned in
config.jsonAPI credentials (api_key, access_token)L14
data_loader.pyHistorical fetch + CSV cache + warmupL15
indicators.pysma, ema, rsi, vwap, macd, supertrendL13
tick_to_candle.pyTick โ†’ OHLCV candle aggregationL16
pipeline.pyOrchestrator โ€” session init, WebSocket, signals, loggingL17 โœ“
logs/*.jsonlAppend-only signal log, one file per dayL17 โœ“
๐ŸŽ‰

Phase 2 Complete!

You've built a production-grade market data pipeline from scratch โ€” authentication, historical data, live ticks, candle assembly, indicators, signals, and logging.

L10 โœ“ Pandas Basics โ€” DataFrames & Series
L11 โœ“ Reading Market Data โ€” CSV, yfinance, NSEpy
L12 โœ“ OHLCV Analysis with Pandas
L13 โœ“ Technical Indicators from Scratch
L14 โœ“ Zerodha KiteConnect API Setup
L15 โœ“ Fetching Historical Data via API
L16 โœ“ WebSocket & Live Tick Feed
L17 โœ“ Building a Live Data Pipeline

Back to Learning Hub

Coming Next: Phase 3 โ€” Strategy Development

You have live data flowing and indicators computing. Phase 3 turns signals into complete strategies with backtesting, position sizing, and walk-forward validation.

Strategy rules & entry/exit logic
Vectorised backtesting engine
Position sizing & Kelly criterion
Walk-forward optimisation
Drawdown & Sharpe analysis
Multi-strategy portfolio