Wire everything together โ session init, warm-up data, WebSocket ticks, candle assembly, indicator engine, signal engine, and trade logging into one production-ready pipeline.
Every lesson in Phase 2 taught you one piece. Today we assemble them into a single, coherent system that runs live during market hours. When the session ends you have a complete JSONL log of every signal your system generated โ ready for backtesting or review.
A single pipeline.py file (~250 lines) that is the entry point for your live trading session. No Jupyter, no notebooks โ pure production Python you can run from the terminal.
Keep your project clean from day one. A flat src/ layout works well for algo systems.
my_algo/
โโโ config.json # API keys (never commit!)
โโโ instruments.csv # Downloaded once at session start
โโโ cache/ # Historical data CSVs
โ โโโ NIFTY_1min.csv
โ โโโ BANKNIFTY_1min.csv
โโโ logs/
โ โโโ signals_2025-01-15.jsonl # Daily trade log
โโโ pipeline.py # โ main entry point (this lesson)
โโโ data_loader.py # MarketDataLoader (L15)
โโโ indicators.py # sma/ema/rsi/vwap/macd/supertrend (L13)
โโโ tick_to_candle.py # TickToCandle (L16)
โโโ utils.py # is_trading_time, logger, etc.
Your api_key and access_token are secret credentials. Add config.json and logs/ to .gitignore before your first commit.
The pipeline starts by loading credentials and verifying the KiteConnect session. If the access token is expired the pipeline exits with a clear error message rather than silently failing later.
import json, time, logging, signal, sys from datetime import datetime, date from pathlib import Path import pandas as pd from kiteconnect import KiteConnect, KiteTicker from data_loader import MarketDataLoader from indicators import sma, ema, rsi, vwap, macd, supertrend from tick_to_candle import TickToCandle # โโ Logging โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s", handlers=[ logging.StreamHandler(sys.stdout), logging.FileHandler(f"logs/pipeline_{date.today()}.log") ] ) log = logging.getLogger("pipeline") # โโ Constants โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ NIFTY_TOKEN = 256265 BN_TOKEN = 260105 SUBSCRIBE = [NIFTY_TOKEN, BN_TOKEN] MARKET_OPEN = "09:15" MARKET_CLOSE = "15:30" LAST_ENTRY = "15:15" # no new signals after this # โโ Config โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ def load_config() -> dict: path = Path("config.json") if not path.exists(): log.error("config.json not found โ run auth flow first") sys.exit(1) with path.open() as f: return json.load(f) def init_kite(cfg: dict) -> KiteConnect: kite = KiteConnect(api_key=cfg["api_key"]) kite.set_access_token(cfg["access_token"]) try: profile = kite.profile() log.info(f"Session OK โ user: {profile['user_name']}") except Exception as e: log.error(f"Session invalid: {e}. Re-run auth flow.") sys.exit(1) return kite
Indicators like RSI(14) or SMA(20) need at least n completed candles before they produce valid values. We fetch historical data before market open so our first live candle already has valid indicator values.
# Minimum candles each indicator needs WARMUP_CANDLES = { "rsi": 14, "ema_slow": 26, # MACD slow EMA "supertrend": 10, # ATR period "sma200": 200, } WARMUP_NEEDED = max(WARMUP_CANDLES.values()) + 20 # buffer def warmup(kite: KiteConnect) -> dict[int, pd.DataFrame]: """Return {token: df} with warm-up OHLCV for each symbol.""" loader = MarketDataLoader(kite, cache_dir="cache") symbols = { NIFTY_TOKEN: ("NSE", "NIFTY 50"), BN_TOKEN: ("NSE", "NIFTY BANK"), } dfs = {} for token, (exchange, name) in symbols.items(): log.info(f"Loading warm-up data for {name} ...") df = loader.load( instrument_token=token, interval="minute", candles_needed=WARMUP_NEEDED ) # Pre-compute indicators on warm-up slice df = compute_indicators(df) dfs[token] = df log.info(f" โ {len(df)} candles loaded for {name}") return dfs
MarketDataLoader.load() checks cache/ first. On subsequent days the CSV is already there โ only the missing recent candles are fetched via the API. Cold start on day 1 takes ~10s; warm start takes under 1s.
The indicator engine takes a DataFrame of OHLCV candles and returns the same DataFrame enriched with all indicator columns. It works identically on warm-up historical data and on live candles โ same function, same output.
def compute_indicators(df: pd.DataFrame) -> pd.DataFrame: """Add all indicator columns. Works on any-length DataFrame.""" df = df.copy() c = df["close"] # Trend indicators df["sma20"] = sma(c, 20) df["sma200"] = sma(c, 200) df["ema9"] = ema(c, 9) # Momentum df["rsi14"] = rsi(c, 14) # Volatility / trend direction st = supertrend(df, atr_period=10, multiplier=3.0) df["st_dir"] = st["direction"] # +1 = bullish, -1 = bearish df["st_val"] = st["supertrend"] # Volume-weighted price df["vwap"] = vwap(df) # MACD mc = macd(c) df["macd"] = mc["macd"] df["macd_sig"] = mc["signal"] df["macd_hist"]= mc["histogram"] return df
The signal engine reads the enriched DataFrame (latest row) and produces a structured signal dict. Every signal gets logged to JSONL for post-session review.
import numpy as np def generate_signal(df: pd.DataFrame, symbol: str) -> dict | None: """ Evaluate the latest candle against all conditions. Returns a signal dict or None if no trade. """ if len(df) < 202: # need enough history for SMA200 return None row = df.iloc[-1] prev = df.iloc[-2] # โโ LONG conditions โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ long_conditions = [ row["close"] > row["sma200"], # price above 200 SMA row["st_dir"] == 1, # supertrend bullish row["close"] > row["vwap"], # above VWAP row["rsi14"] > 55, # momentum positive prev["macd_hist"] < 0 < row["macd_hist"], # MACD crossover ] # โโ SHORT conditions โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ short_conditions = [ row["close"] < row["sma200"], row["st_dir"] == -1, row["close"] < row["vwap"], row["rsi14"] < 45, prev["macd_hist"] > 0 > row["macd_hist"], ] # Require ALL conditions to be met (strict filter) direction = None if all(long_conditions): direction = "LONG" elif all(short_conditions): direction = "SHORT" else: return None # ATR-based stop & target atr = df["high"].rolling(14).max().iloc[-1] - \ df["low"].rolling(14).min().iloc[-1] entry = row["close"] sl = entry - 1.5 * atr if direction == "LONG" else entry + 1.5 * atr tgt = entry + 3.0 * atr if direction == "LONG" else entry - 3.0 * atr return { "ts": row.name.isoformat(), "symbol": symbol, "direction": direction, "entry": round(entry, 2), "sl": round(sl, 2), "target": round(tgt, 2), "rsi14": round(row["rsi14"], 1), "st_dir": int(row["st_dir"]), "conditions_met": sum(long_conditions if direction=="LONG" else short_conditions), }
JSONL (JSON Lines) is the ideal format for trade logs โ each line is a valid JSON object, easy to append, easy to query with pandas later. One file per day.
class TradeLogger: def __init__(self): Path("logs").mkdir(exist_ok=True) self.path = Path(f"logs/signals_{date.today()}.jsonl") log.info(f"Trade log โ {self.path}") def write(self, signal: dict): with self.path.open("a") as f: f.write(json.dumps(signal) + "\n") log.info( f"SIGNAL โ {signal['symbol']:12s} โ {signal['direction']:5s} โ " f"entry={signal['entry']} โ sl={signal['sl']} โ tgt={signal['target']}" ) # Load the day's log back for analysis: def load_signals(path: str) -> pd.DataFrame: records = [] with open(path) as f: for line in f: records.append(json.loads(line.strip())) df = pd.DataFrame(records) df["ts"] = pd.to_datetime(df["ts"]) return df.set_index("ts")
| Column | Type | Description |
|---|---|---|
ts | ISO datetime | Candle close timestamp |
symbol | string | NIFTY 50 or NIFTY BANK |
direction | LONG / SHORT | Trade direction |
entry | float | Close price at signal |
sl | float | Stop-loss (1.5ร ATR) |
target | float | Target (3ร ATR = 2:1 R:R) |
rsi14 | float | RSI at signal candle |
conditions_met | int (0-5) | Debug: how many filters fired |
When a new 1-minute candle closes, TickToCandle fires on_candle_close(). We append the candle to our warm-up DataFrame, recompute indicators on the tail, run the signal engine, and log any signal โ all in under 10ms.
def is_trading_time() -> bool: now = datetime.now().strftime("%H:%M") return MARKET_OPEN <= now <= MARKET_CLOSE def new_entries_allowed() -> bool: now = datetime.now().strftime("%H:%M") return MARKET_OPEN <= now <= LAST_ENTRY class Pipeline: def __init__(self, kite: KiteConnect): self.kite = kite self.logger = TradeLogger() self.buffers : dict[int, pd.DataFrame] = {} # token โ live df self.names = {NIFTY_TOKEN: "NIFTY 50", BN_TOKEN: "NIFTY BANK"} # Pre-load warm-up data self.buffers = warmup(kite) # One TickToCandle builder per symbol self.builders = { token: TickToCandle( interval_minutes=1, on_candle_close=lambda candle, t=token: self.on_candle(t, candle) ) for token in SUBSCRIBE } def on_candle(self, token: int, candle: dict): """Called each time a 1-min candle closes.""" if not is_trading_time(): return symbol = self.names[token] # 1. Append new candle to buffer row = pd.DataFrame([candle]).set_index("datetime") self.buffers[token] = pd.concat( [self.buffers[token], row] ).tail(500) # keep last 500 candles (โ 8 trading days) # 2. Recompute indicators on tail only (fast) df = compute_indicators(self.buffers[token]) self.buffers[token] = df # 3. Signal engine if new_entries_allowed(): sig = generate_signal(df, symbol) if sig: self.logger.write(sig) def start(self): """Connect WebSocket and block until session ends.""" kws = KiteTicker( api_key=cfg["api_key"], access_token=cfg["access_token"] ) def on_ticks(ws, ticks): for tick in ticks: token = tick["instrument_token"] if token in self.builders: self.builders[token].on_tick( ltp=tick["last_price"], volume=tick.get("volume", 0), timestamp=tick["exchange_timestamp"] ) def on_connect(ws, response): log.info("WebSocket connected โ subscribing ...") ws.subscribe(SUBSCRIBE) ws.set_mode(ws.MODE_FULL, SUBSCRIBE) def on_close(ws, code, reason): log.warning(f"WebSocket closed: {code} {reason}") def on_error(ws, code, reason): log.error(f"WebSocket error: {code} {reason}") kws.on_ticks = on_ticks kws.on_connect = on_connect kws.on_close = on_close kws.on_error = on_error log.info("Starting WebSocket (threaded) ...") kws.connect(threaded=True) # Block main thread until market close while is_trading_time(): time.sleep(30) log.info("Market closed โ stopping pipeline.") kws.close()
The main() block ties everything together. We also register a SIGINT handler so Ctrl+C logs a clean shutdown message rather than throwing a traceback.
# โโ Graceful shutdown โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ _running = True def _handle_sigint(sig, frame): global _running log.info("Interrupt received โ shutting down cleanly ...") _running = False signal.signal(signal.SIGINT, _handle_sigint) def main(): log.info("โโโ Vianmax Pipeline โ starting โโโ") cfg = load_config() kite = init_kite(cfg) # Wait for market open if called early now = datetime.now().strftime("%H:%M") if now < MARKET_OPEN: wait_sec = ( datetime.strptime(MARKET_OPEN, "%H:%M") - datetime.strptime(now, "%H:%M") ).seconds log.info(f"Market opens at {MARKET_OPEN} โ waiting {wait_sec//60}m ...") time.sleep(wait_sec) pipeline = Pipeline(kite) pipeline.start() log.info("โโโ Pipeline finished. Review logs/ for signals. โโโ") if __name__ == "__main__": main()
python pipeline.py โ the pipeline loads warm-up data, waits for market open, connects WebSocket, and starts logging signals. Signals appear in logs/signals_YYYY-MM-DD.jsonl.
After the session ends, review the day's signals in a Jupyter notebook or script.
import pandas as pd, json def load_signals(jsonl_path: str) -> pd.DataFrame: records = [json.loads(l) for l in open(jsonl_path)] df = pd.DataFrame(records) df["ts"] = pd.to_datetime(df["ts"]) return df.set_index("ts") df = load_signals("logs/signals_2025-01-15.jsonl") # How many signals per symbol? print(df.groupby(["symbol", "direction"]).size()) # Average R:R ratio df["risk"] = (df["entry"] - df["sl"]).abs() df["reward"] = (df["target"] - df["entry"]).abs() df["rr"] = df["reward"] / df["risk"] print(f"Avg R:R = {df['rr'].mean():.2f}") # Signal timing histogram df["hour"] = df.index.hour df.groupby("hour").size().plot(kind="bar", title="Signals by Hour")
| File | Responsibility | Learned in |
|---|---|---|
config.json | API credentials (api_key, access_token) | L14 |
data_loader.py | Historical fetch + CSV cache + warmup | L15 |
indicators.py | sma, ema, rsi, vwap, macd, supertrend | L13 |
tick_to_candle.py | Tick โ OHLCV candle aggregation | L16 |
pipeline.py | Orchestrator โ session init, WebSocket, signals, logging | L17 โ |
logs/*.jsonl | Append-only signal log, one file per day | L17 โ |
You've built a production-grade market data pipeline from scratch โ authentication, historical data, live ticks, candle assembly, indicators, signals, and logging.