The kite.historical_data() method returns OHLCV candle data for any instrument, interval, and date range. It returns a list of dicts — we convert it to a Pandas DataFrame immediately. This is your primary data source for all backtesting and live strategy warm-up.
| Interval | Max Range per Call | Common Use |
|---|---|---|
| minute | 60 days | Intraday scalping strategies |
| 3minute | 100 days | Short-term momentum |
| 5minute | 100 days | Standard intraday (most common) |
| 15minute | 200 days | Swing intraday with trend confirmation |
| 60minute | 400 days | Daily trend context |
| day | 2000 days (~8 years) | Backtesting, swing trading |
from kiteconnect import KiteConnect import pandas as pd from datetime import datetime, timedelta # Assumes kite session is already active (see Lesson 14) NIFTY_TOKEN = 256265 # instrument_token for NIFTY 50 # ── Fetch 5-minute NIFTY candles for last 30 days ── to_date = datetime.now() from_date = to_date - timedelta(days=30) raw_data = kite.historical_data( instrument_token = NIFTY_TOKEN, from_date = from_date, to_date = to_date, interval = "5minute", continuous = False # False for index/equity; True for futures rollover ) # Convert to DataFrame df = pd.DataFrame(raw_data) df = df.rename(columns={"date": "datetime"}) df = df.set_index("datetime") print(f"Candles : {len(df)}") print(f"From : {df.index[0]}") print(f"To : {df.index[-1]}") print(df.tail(3))
Multi-Chunk Fetching for Large Date Ranges
The 5-minute interval is capped at 100 days per API call. For longer backtesting periods, split the date range into chunks and concatenate the results.
from datetime import datetime, timedelta import pandas as pd import time def fetch_historical_chunked( kite, token: int, interval: str, from_dt: datetime, to_dt: datetime, chunk_days: int = 60 ) -> pd.DataFrame: """ Fetch historical data in chunks to bypass per-call date limits. Handles rate limiting with sleep between calls. """ chunks = [] current = from_dt while current < to_dt: chunk_end = min(current + timedelta(days=chunk_days), to_dt) print(f"📥 Fetching {current.date()} → {chunk_end.date()}") try: raw = kite.historical_data( instrument_token = token, from_date = current, to_date = chunk_end, interval = interval, continuous = False ) if raw: chunks.append(pd.DataFrame(raw)) except Exception as e: print(f"⚠️ Error in chunk: {e}") current = chunk_end + timedelta(days=1) time.sleep(0.35) # Rate limit: ~3 calls/second allowed if not chunks: raise ValueError("No data returned for the specified range") df = pd.concat(chunks, ignore_index=True) df = df.rename(columns={"date": "datetime"}) df = df.set_index("datetime").sort_index() df = df[~df.index.duplicated(keep="last")] # remove overlaps print(f"✅ Total candles: {len(df)}") return df # Fetch 6 months of 5-minute NIFTY data from_dt = datetime(2024, 1, 1) to_dt = datetime(2024, 6, 30) nifty_5m = fetch_historical_chunked( kite, NIFTY_TOKEN, "5minute", from_dt, to_dt, chunk_days=60 )
Data Normalisation Pipeline
def normalize_kite_data(df: pd.DataFrame) -> pd.DataFrame: """Normalize raw KiteConnect DataFrame to clean OHLCV format.""" # 1. Ensure lowercase columns df.columns = [c.lower() for c in df.columns] # 2. Keep only OHLCV (drop 'oi' for non-futures) ohlcv_cols = [c for c in ["open","high","low","close","volume"] if c in df.columns] df = df[ohlcv_cols] # 3. Ensure float dtypes for col in ohlcv_cols: df[col] = pd.to_numeric(df[col], errors="coerce") # 4. Drop NaN df = df.dropna() # 5. Filter NSE session hours only (09:15 to 15:30) df = df.between_time("09:15", "15:30") # 6. Validate OHLC integrity valid = ( (df["high"] >= df["open"]) & (df["high"] >= df["close"]) & (df["low"] <= df["open"]) & (df["low"] <= df["close"]) ) df = df[valid] print(f"✅ Normalised: {len(df)} candles | {df.index[0].date()} → {df.index[-1].date()}") return df df_clean = normalize_kite_data(nifty_5m) print(df_clean.dtypes)
Production Data Loader with Cache
import json from pathlib import Path class MarketDataLoader: """ Production-grade market data loader. Fetches from KiteConnect API on first run, uses local CSV cache thereafter. """ def __init__(self, kite: KiteConnect, data_dir: str = "data"): self.kite = kite self.data_dir = Path(data_dir) self.data_dir.mkdir(exist_ok=True) def _cache_path(self, symbol: str, interval: str) -> Path: return self.data_dir / f"{symbol}_{interval}.csv" def get(self, symbol: str, token: int, interval: str, days: int = 60, force_refresh: bool = False) -> pd.DataFrame: """ Get OHLCV data. Uses cache if available and not force_refresh. """ cache = self._cache_path(symbol, interval) if cache.exists() and not force_refresh: print(f"📂 {symbol} {interval} — loaded from cache") return pd.read_csv(cache, index_col=0, parse_dates=True) # Fetch from API to_dt = datetime.now() from_dt = to_dt - timedelta(days=days) df = fetch_historical_chunked( self.kite, token, interval, from_dt, to_dt ) df = normalize_kite_data(df) # Save to cache df.to_csv(cache) print(f"💾 Saved to cache: {cache}") return df # Usage loader = MarketDataLoader(kite) nifty_5m = loader.get("NIFTY", 256265, "5minute", days=60) nifty_15m = loader.get("NIFTY", 256265, "15minute", days=100) bn_5m = loader.get("BANKNIFTY", 260105, "5minute", days=60) print(f"NIFTY 5m : {len(nifty_5m)} candles") print(f"NIFTY 15m: {len(nifty_15m)} candles") print(f"BN 5m : {len(bn_5m)} candles")
Warm-Up Candles — Live Strategy Preparation
Most indicators (EMA, RSI, VWAP) need warm-up candles to produce accurate values. Before your algo starts trading at 09:15, pre-load historical candles so indicators are already "warm."
from datetime import datetime, time as dtime, timedelta def warmup_data(kite, token: int, interval: str, warmup_candles: int = 50) -> pd.DataFrame: """ Fetch enough historical candles to warm up all indicators. Called once before market opens. """ # Calculate how many days needed for warmup_candles at this interval candles_per_day = { "minute": 375, "3minute": 125, "5minute": 75, "15minute": 25, "60minute": 6, "day": 1 } cpd = candles_per_day.get(interval, 75) days_needed = (warmup_candles // cpd) + 5 # buffer for weekends/holidays to_dt = datetime.now() from_dt = to_dt - timedelta(days=days_needed) raw = kite.historical_data( instrument_token = token, from_date = from_dt, to_date = to_dt, interval = interval ) df = pd.DataFrame(raw).rename(columns={"date":"datetime"}).set_index("datetime") print(f"✅ Warmup ready: {len(df)} candles ({days_needed} days fetched)") return df # Before market opens: prepare the candle buffer buffer = warmup_data(kite, 256265, "5minute", warmup_candles=50) # Add indicators immediately (they're now accurate, not NaN) buffer["ema9"] = buffer["close"].ewm(span=9, adjust=False).mean() buffer["ema21"] = buffer["close"].ewm(span=21, adjust=False).mean() print(f"Last EMA9 : {buffer['ema9'].iloc[-1]:.2f}") print(f"Last EMA21: {buffer['ema21'].iloc[-1]:.2f}")
Alternative: Upstox & Angel SmartAPI
Don't have a Zerodha account? Both Upstox and Angel Broking provide free historical data APIs. The workflow is identical — different SDK names, same Pandas patterns.
# ── Upstox v2 API (free) ── # pip install upstox-python-sdk import upstox_client from upstox_client.rest import ApiException configuration = upstox_client.Configuration() configuration.access_token = "your_upstox_token" hist_api = upstox_client.HistoryApi(upstox_client.ApiClient(configuration)) response = hist_api.get_historical_candle_data( instrument_key = "NSE_INDEX|Nifty 50", # Upstox instrument key format interval = "5minute", to_date = "2024-01-31", from_date = "2024-01-01" ) df_up = pd.DataFrame(response.data.candles, columns=[ "datetime","open","high","low","close","volume","oi" ]) print(df_up.tail(3))
pip install smartapi-python and the same Pandas patterns from this lesson.Quiz
Exercises
MarketDataLoader, fetch 60 days of 5-minute data for both NIFTY 50 and BANKNIFTY. Save both to CSV. Then verify the data by printing: number of candles, date range, and average daily volume for each symbol.prepare_algo(kite, token, interval) that: fetches 50 warm-up candles via historical API, adds EMA9, EMA21, RSI(14), and VWAP, and returns the ready-to-trade DataFrame with all indicators populated. Print the last row of each indicator to confirm no NaN values.