Skip to main content

(AI Blog#17) RAG - Preparing Knowledge Base - Data Extractions, Chunking, Embedding, Vector Store/DB

RAG(Retrieval Augmented Generation) is a technique to make AI models(LLMs) more accurate, up-to-date and context aware by combining two things:

  • Retrieval (fetching relevant data)
  • Generation (creating response using an LLM)
Why RAG is needed ?

Traditional LLMs (like GPT models):
  • Have fixed knowledge(based on training data)
  • Can hallucinate(make up answers)
  • Don't know your private/company data
RAG solves this issue by injecting real-time or custom data into the model. Even before building a RAG pipeline, we need to prepare our Knowledge Base. We will discuss in detail about preparing knowledge base in this blog. This is very important step in the process of building a RAG pipeline.

RAG Pipeline :

Kindly refer below image in detail for all the topics that we discuss in this blog and the next blog.

Understand that LLMs are pre-trained models and extract the data from the internet via various sources and train the model. To make you clear, if you ask LLMs a question like "What is the capital  of Andhra Pradesh ?" It will say "Amaravati" but if you ask "what is our company's sick leave policy ?" It will confuse because it doesn't know what company you are referring! It doesn't have access to our companies database. This is where RAG comes into picture to inject our project specific data in a safe way. 

First and foremost pre-step, before building a RAG is preparing a Knowledge Base. It could be a PDF file, a web page, or a relational database or a file system. It could be any data source which is a proprietary to our organization. We have to place this data into a DB called Vector DB

This is a 4 step process as below and it is called Indexing:

  • Data Extraction
  • Data Chunking
  • Data Embedding
  • Store Embedded data into Vector DB

Means our organizations proprietary data will be stored in a system called Vector DB. This process is called indexing/storing our data/knowledge base.

Note: It is extremely important to note that this is NOT retrieval phase in RAG. This is called indexing which is preparing our knowledge base, this must happen even before we start building RAG.


What is RAG ?

Once you build this knowledge base, suppose user have a question called "What are the benefits of renewable energy ?" assume that answer to this question is our company proprietary data which is available in our knowledge base. Once RAG system receive this question from user, then RETRIEVAL step will start. It simply read user query, then this user query will be converted into EMBEDDING(here chunking is a optional step as user query might not be as long as context window), then we will compare this embedded query data with the embeddings of chunks available in Vector DB based on the similarity search. Means those embeddings which are already stored in vector DB as part of preparing knowledge base will be retrieved based on the similarity score

We have different ways to do a similarity search but most famous in production is Cosine similarity search with below formulae:


Where:

  • A, B are two vectors (in our case - user query & matched embedding in vector DB)
  • A. B is dot product of above 2 vectors

Final result :

Cosine similarity score of two given vectors A, B is ~ 0.99 which means two vectors are almost similar and this will be returned from vector DB. Similarly, we can search even top-n similarity chunks from vector based on the users requirement.

I hope the concept is clear in your brain about how a similarity match could found using Cosine similarity search in real time. This entire procedure is called a RETRIEAVAL mechanism in RAG.

Also, we need to understand the concept of Context Window to understand 'why we do Chunking'. Below image shows the context window size of GPT models from Open AI. 

Context Window :

A context window is the maximum amount of text (measured in token - each sub-word in context is a token) that a AI model can read, remember and use at one time while generating a response. It is model short term memory limit. Above image shows the context window size of each of GPT models. It has evolved from 2k tokens in GPT-3 to 1M tokens in GPT-4.1, enabling modern LLMs to process entire documents, code bases and long conversations efficiently. 

That's why, once we extract the data from step1(data extraction - as part of preparing knowledge base), we will chunk the data and convert them into embeddings and then store those embedded chunks in Vector DB.



Output of RETRIEVAL step are corresponding embeddings from Knowledge base which it collected based on the similarity score. If user mention top 3 chunks, then results from top 3 similarity scores will be retrieved from Vector DB. 


Augmentation 

Output of RETRIEVAL step + user query is nothing but Augmentation. We will prompt techniques for it. Please refer the diagram mentioned on the starting of this blog for the same.


Generation 

Generation seeks the help of LLM.  Output of retrieval step + user query submitted to LLM. LLM will articulate the final response. This is called Generation. 


Important points to remember :

  • In real time, most of the engineers are landing into issues due to not properly handling the Indexing part
  • We need to understand the correct format of our input files and use corresponding extractors only to extract the text, otherwise you will end up creating incorrect knowledge base with stale data or not data
  • For example, all .PDF files are not PDF, they might be screenshots finally wrapped up as a .PDF file and if you use simple PDF extractor for such files then you will definitely land into issues.
  • Indexing part is the most complicated part, it needs to be handled cautiously, need to validate once done.
  • If you are managed to handle this part right, then RAG will be comfortable.

Let us start exploring Indexing part which is a pre-step for RAG.


Indexing 

1) Data extraction & processing

We are going to deal all the below file formats

  • PDF
  • Scanned image
  • docx
  • pptx
  • html
  • xlsx
  • JSON

PDF Files : We can use below libraries to parse PDF files

  • PyPDF2
  • PDFPlumber - Stable library for production
  • PyMuPDF

Implementation of data extraction using PyPDF2 :

from pathlib import Path
import PyPDF2

# Get the path to the PDF file
pdf_path = Path(__file__).parent / "Data" / "financial_report_2024.pdf"

def extract_pypdf2(pdf_path: Path) -> str:
    with open(pdf_path, "rb") as f:
        reader = PyPDF2.PdfReader(f)
        text = ""
        for page in reader.pages:
            text += page.extract_text()
        return text

print(extract_pypdf2(pdf_path))

  • We are referring a file called "financial_report_2024.pdf" from folder 'Data' inside same directory where our code is located
  • Using Path class from pathlib to get the path
  • Created a function as above, opened pdf file as 'f' and reading each page using the class PdfReader
  • Once a page is read, adding the context to variable text which is a string object and returning text
Note that using PyPDF2, we are able to read but it is not behaving properly for tables, headers etc. in the PDF file. It is able to read simple text including header & footer as well, but not other parts of the PDF file. This is the issue with PyPDF2. Just see how it printed output in the below screenshot.


Implementation of data extraction using PDFPlumber :

from pathlib import Path
import pdfplumber

# Get the path to the PDF file
pdf_path = Path(__file__).parent / "Data" / "financial_report_2024.pdf"

def extract_pdfplumber(pdf_path: Path) -> str:
    text = ""
   
    with pdfplumber.open(pdf_path) as pdf:
        for page in pdf.pages:
            page_text = page.extract_text()
           
            if page_text:  # Handle None cases
                text += page_text + "\n"
   
    return text

print(extract_pdfplumber(pdf_path))

Almost same code but just using PDFPlumber (instead of PyPDF2) and see how accurately it read the text from PDF file. It is far better than PyPDF2. Hence this is a standard library that we use in real time for extracting data from PDF files.

It printed tables as table and normal text as text. This seems to be more powerful. PDFPlumber retains the exact structure of source document.

Output : 



Observe below difference between PyPDF2 Vs PDFPlumber :

PDFPlumber is properly articulating the context in the PDF. Hence it is recommended to use PDFPlumber in real time.

Another example : 
Look at the PDF below, it has 2 columns and we need to read it.


from pathlib import Path
import pdfplumber

# Get the path to the PDF file
pdf_path = Path(__file__).parent / "Data" / "1810.04805v2.pdf"

def extract_pdfplumber(pdf_path: Path) -> str:
    with pdfplumber.open(pdf_path) as pdf:
        text = ""
        for page in pdf.pages:
            text += page.extract_text(use_text_flow=True)
        return text
       
print("pdfplumber extracted text:")
print(extract_pdfplumber(pdf_path)[:500])
print("-" * 100)

  • We are using PDFPlumber only but the difference is, we are using only one additional parameter called use_text_flow=True
  • Rest of the program is same as shown in first example. If we enable this property, it will read the PDF based on the flow of text.
  • If we don't enable this property, it will read the PDF row by row blindly.

Try to disable above property when you try, then it will blindly read row. 

Also note that all these implementations are available in frameworks like LangChain as well : https://docs.langchain.com/oss/javascript/integrations/document_loaders

But the reason behind learning all these classes is, we should understand the implementation of all these classes so that we can work independently if out client is not using LangChain. This helps us to write our own logic.

Note :

So far, we have seen how to extract data from PDF files using PyPDF2 & PDFPlumber. Lets see how to extract data from scanned images.

What are scanned images ?

  • When you take a photo in your mobile and need data to be extracted from those photos
  • You got photos or screenhots of some document and converted that into a .PDF file. This is not an actual PDF file, isn't it ? We will see how to extract data in such case.

Popular technique to handle scanned images is, OCR (Optical Character Recognition). This helps to read text from scanned documents & images and extract that text into editable, searchable text. 


Tesseract :

Tesseract is one of the popular package to read and extract images, text from scanned images as mentioned above. It is meant for POC's, not recommended in real time. For enterprise level or production applications, below cloud based services are available.

Cloud based OCR APIs are as below(recommended for production) :

  • Google :
    • Google cloud vision API - to handle images from scanned documents
    • Google document AI - to handle text from scanned documents
  • Microsoft :
    • Azure computer vision OCR - to handle images from scanned documents
    • Azure Form recognizer - to handle text from scanned documents
  • AWS 
    • Amazon Textract - it will handle both images, text from scanned documents


Note :

Please download below .exe from following GitHub location for tesseract to work in your local machine : https://github.com/UB-Mannheim/tesseract/wiki


Also, download below zip folder and place it in any folder in your computer from follwoing location : https://github.com/oschwartz10612/poppler-windows/releases/



Implementation of Tesseract in local machine :

from pdf2image import convert_from_path
import pytesseract
from pathlib import Path

# ✅ Set Tesseract path (IMPORTANT on Windows)
pytesseract.pytesseract.tesseract_cmd = r"C:\Program Files\Tesseract-OCR\tesseract.exe"


# ✅ Use raw string for Windows path
pdf_path = Path(r"D:\GenAI Content\AI code\4_RAG_Indexing\1_Document_Processing\
Data\fixed_scanned_image.pdf")


def extract_text_from_scanned_pdf(pdf_path: Path):
    text = ""

    # ✅ Convert PDF → images (fix: pass str path)
    images = convert_from_path(
        str(pdf_path),  # ⚠️ Important fix
        poppler_path=r"D:\GenAI Content\Release-25.12.0-0\poppler-25.12.0\Library\bin"
    )

    for i, image in enumerate(images):
        print(f"Processing page {i+1}...")

        # ✅ Improve OCR accuracy with config
        page_text = pytesseract.image_to_string(
            image,
            config="--oem 3 --psm 6"
        )

        text += page_text + "\n"

    return text


if __name__ == "__main__":
    result = extract_text_from_scanned_pdf(pdf_path)
    print(result)

Output :


Input File :


  • Carefully observe the path for input scanned file, Tesseract and Poppler paths in the code.
  • Observe the image in the input file
  • Note this will only extract text from scanned images, if you want to read image itself then you need to convert that into a vector or string representation using models like CLIP or other similar methods.
  • we are using pytesseract library to scan text from image in the above code
  • config="--oem 3 --psm 6"
    • oem is OCR engine mode, it contains multiple modes and we selected 3
      • if we choose 2 then it will use LSTM
    • psm is page sigmentation mode
      • it will help tesseract library to understand layout of image
      • it has different options 0, 3, 6, 7, 8, 10
      • we selected 6,it will assume a single uniform block of text in the image


Implementation of PyMuPDF using Tesseract :

import fitz  # PyMuPDF
import pytesseract
import cv2
import numpy as np
from PIL import Image

# Set Tesseract path (Windows)
pytesseract.pytesseract.tesseract_cmd = r"C:\Users\anilk\AppData\Local\Programs
\Tesseract-OCR\tesseract.exe"

pdf_path = "C:\\Personal\\2024\\Learning\\Generative AI\\RAG\\27_Context_Engineering
\\2_RAG\\1_Document_Processing\\Data\\CIA-RDP82-00038R001800200001-1.pdf"

def preprocess_image(pil_image):
    img = np.array(pil_image)

    # Convert to grayscale
    gray = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)

    # Remove noise
    gray = cv2.medianBlur(gray, 3)

    # Threshold (important for old scans)
    _, thresh = cv2.threshold(gray, 150, 255, cv2.THRESH_BINARY)

    return thresh


def extract_text(pdf_path):
    doc = fitz.open(pdf_path)
    full_text = ""

    for page_num, page in enumerate(doc):
        print(f"Processing page {page_num + 1}/{len(doc)}")

        # Convert page → image
        pix = page.get_pixmap(dpi=300)  # high DPI improves OCR
        img = Image.frombytes("RGB", [pix.width, pix.height], pix.samples)

        # Preprocess
        processed = preprocess_image(img)

        # OCR
        text = pytesseract.image_to_string(
            processed,
            config="--oem 3 --psm 6"
        )

        full_text += f"\n--- Page {page_num+1} ---\n{text}"

    return full_text


result = extract_text(pdf_path)

# Save output
with open("output.txt", "w", encoding="utf-8") as f:
    f.write(result)

print("Extraction completed!")

This is another way of extracting test from a complex PDF which is not a actual PDF but a wrapped .pdf file with screenshots. 


Handling PPT files :

Incase if our data is in the form of PPT files, then how to handle it ? Lets see:

from pptx import Presentation

ppt_path = "D:\\GenAI Content\\AI code\\4_RAG_Indexing\\1_Document_Processing\\
Data\\Prompt_Engineering_Guide.pptx"

def extract_ppt_text(ppt_path):
    prs = Presentation(ppt_path)
    all_text = ""

    for i, slide in enumerate(prs.slides):
        all_text += f"\n--- Slide {i+1} ---\n"

        for shape in slide.shapes:
            if shape.has_text_frame:
                for paragraph in shape.text_frame.paragraphs:
                    all_text += paragraph.text + "\n"

    return all_text


text = extract_ppt_text(ppt_path)
print(text)

It will start extracting the text slide by slide. We are using a class called Presentation from library pptx.

Output :



Handling PDF files with multiple types of tables :

We have a powerful library which was released recently called tabula. Using this library, we can extract text from complex PDF files where we have multiple types of tables in it.

Implementation :

# Using tabula
from tabula.io import read_pdf
import pandas as pd

pdf_path = r'D:\GenAI Content\AI code\4_RAG_Indexing\1_Document_Processing\Data
\Safari_text.pdf'

# Try lattice mode first (for tables with visible borders)
tables = read_pdf(
    pdf_path,
    pages='all',
    multiple_tables=True,
    lattice=True,
    guess=False,
    pandas_options={'header': None},
)

# Fallback to stream mode if no tables found
if not tables:
    tables = read_pdf(
        pdf_path,
        pages='all',
        multiple_tables=True,
        stream=True,
        guess=False,
        pandas_options={'header': None},
    )

for i, table in enumerate(tables, 1):
    print(f"\nTable {i}:")
    print(table)
    print("-" * 50)

Output :




But always remember, if you want to reduce cost, then we need to write our own logic.


Handling xlsx files :

  • Approach-1
    • Using Pandas
  • Approach-2
    • Using openpyxl library, load_workbook()


import pandas as pd
from openpyxl import load_workbook

excel_path = 'C:\\Personal\\2024\\Learning\\Generative AI\\RAG\\
27_Context_Engineering\\2_RAG\\1_Document_Processing\\Data\\sales_data.xlsx'

# ── Method 1: pandas — best for data analysis ─────────────────────────────────
print("=" * 60)
print("METHOD 1: pandas read_excel (preserves table structure)")
print("=" * 60)

df = pd.read_excel(excel_path, sheet_name='Sales Data')
print(df.to_string(index=False))

# Basic analysis
print(f"\nTotal Orders : {len(df)}")
print(f"Total Revenue: ${df['Total ($)'].sum():,.2f}")
print(f"\nRevenue by Category:")
print(df.groupby('Category')['Total ($)'].sum().to_string())

# ── Method 2: openpyxl — best for reading cell-by-cell with formatting ────────
print("\n" + "=" * 60)
print("METHOD 2: openpyxl (reads raw cell values + structure)")
print("=" * 60)

wb = load_workbook(excel_path, data_only=True)  # data_only=True reads formula results
ws = wb['Sales Data']

# Print headers
headers = [cell.value for cell in ws[1]]
print(" | ".join(str(h) for h in headers))
print("-" * 80)

# Print data rows (skip header + totals row)
for row in ws.iter_rows(min_row=2, max_row=ws.max_row - 1, values_only=True):
    print(" | ".join(str(v) if v is not None else "" for v in row))

# ── Method 3: pandas — read all sheets at once ────────────────────────────────
print("\n" + "=" * 60)
print("METHOD 3: Read ALL sheets into a dictionary")
print("=" * 60)

all_sheets = pd.read_excel(excel_path, sheet_name=None)
for sheet_name, sheet_df in all_sheets.items():
    print(f"\nSheet: '{sheet_name}' → {sheet_df.shape[0]} rows x {sheet_df.shape[1]}
cols")
    print(sheet_df.head(3).to_string(index=False))

Output :




Handling docx files :

from docx import Document

doc = Document('C:\\Personal\\2024\\Learning\\Generative AI\\RAG\\
27_Context_Engineering\\2_RAG\\1_Document_Processing\\Data\\employee_report.docx')

# ── Method 1: Read all paragraphs with style info ─────────────────────────────
print("=" * 60)
print("METHOD 1: Paragraphs with Headings & Styles")
print("=" * 60)

for para in doc.paragraphs:
    if not para.text.strip():
        continue

    # Fix: safely get style name, default to 'Normal' if None
    style = para.style.name if para.style is not None else 'Normal'

    if style == 'Heading 1':
        print(f"\n{'#' * 60}")
        print(f"  H1: {para.text}")
        print(f"{'#' * 60}")
    elif style == 'Heading 2':
        print(f"\n  >> H2: {para.text}")
        print(f"  {'-' * 40}")
    elif style == 'Heading 3':
        print(f"\n    > H3: {para.text}")
    elif 'List' in style:
        print(f"    • {para.text}")
    else:
        formatted = ""
        for run in para.runs:
            if run.bold:
                formatted += f"[BOLD: {run.text}]"
            elif run.italic:
                formatted += f"[ITALIC: {run.text}]"
            else:
                formatted += run.text
        print(f"    {formatted}")

# ── Method 2: Read tables preserving structure ────────────────────────────────
print("\n" + "=" * 60)
print("METHOD 2: Tables with Structure")
print("=" * 60)

for t_idx, table in enumerate(doc.tables, 1):
    print(f"\nTable {t_idx}: {len(table.rows)} rows x {len(table.columns)} cols")
    print("-" * 60)

    for r_idx, row in enumerate(table.rows):
        cells = [cell.text.strip() for cell in row.cells]
        label = "HEADER" if r_idx == 0 else f"Row {r_idx:>2}"
        print(f"  {label}: {' | '.join(cells)}")

# ── Method 3: Full document structure as dict ─────────────────────────────────
print("\n" + "=" * 60)
print("METHOD 3: Full Document Structure as Dictionary")
print("=" * 60)

structure = {"headings": [], "paragraphs": [], "tables": [], "lists": []}

for para in doc.paragraphs:
    if not para.text.strip():
        continue

    # Fix: safely get style name here too
    style = para.style.name if para.style is not None else 'Normal'

    if 'Heading' in style:
        structure["headings"].append({"level": style, "text": para.text})
    elif 'List' in style:
        structure["lists"].append(para.text)
    else:
        structure["paragraphs"].append(
            para.text[:80] + "..." if len(para.text) > 80 else para.text
        )

for table in doc.tables:
    rows = [[cell.text.strip() for cell in row.cells] for row in table.rows]
    structure["tables"].append({
        "rows": len(table.rows),
        "cols": len(table.columns),
        "data": rows
    })

print(f"  Headings  : {len(structure['headings'])}")
print(f"  Paragraphs: {len(structure['paragraphs'])}")
print(f"  Tables    : {len(structure['tables'])}")
print(f"  List items: {len(structure['lists'])}")

print("\n  Headings found:")
for h in structure["headings"]:
    print(f"    [{h['level']}] {h['text']}")

print("\n  Table data:")
for i, tbl in enumerate(structure["tables"], 1):
    print(f"    Table {i} ({tbl['rows']}x{tbl['cols']}):")
    for row in tbl["data"]:
        print(f"      {row}")

Observe that we handled heading, subheading, paragraphs etc. separately in the above code. 

Output :



Handling HTML files :


from bs4 import BeautifulSoup
import pandas as pd

with open('D:\\GenAI Content\\AI code\\4_RAG_Indexing\\1_Document_Processing\\Data
\\company_report.html', 'r', encoding='utf-8') as f:
    soup = BeautifulSoup(f, 'lxml')

# ── Method 1: Document metadata ───────────────────────────────────────────────
print("=" * 60)
print("METHOD 1: Document Metadata")
print("=" * 60)

print(f"Title   : {soup.title.text.strip()}")
print(f"H1      : {soup.find('h1').text.strip()}")
print(f"Sections: {len(soup.find_all('section'))}")
print(f"Tables  : {len(soup.find_all('table'))}")
print(f"Lists   : {len(soup.find_all(['ul', 'ol']))}")

# ── Method 2: All sections with headings + content ────────────────────────────
print("\n" + "=" * 60)
print("METHOD 2: Sections with Headings & Paragraphs")
print("=" * 60)

for section in soup.find_all('section'):
    section_id = section.get('id', 'unknown')
    h2 = section.find('h2')
    heading = h2.text.strip() if h2 else 'No heading'
    print(f"\n[Section: #{section_id}] {heading}")
    print("-" * 50)

    # Paragraphs
    for p in section.find_all('p'):
        text = p.get_text(strip=True)
        if text:
            print(f"  Para : {text[:100]}{'...' if len(text) > 100 else ''}")

    # Blockquotes
    for bq in section.find_all('blockquote'):
        quote = bq.get_text(separator=' ', strip=True)
        print(f"  Quote: {quote[:100]}...")

    # List items
    for ul in section.find_all(['ul', 'ol']):
        list_id = ul.get('id', 'list')
        items = [li.get_text(strip=True) for li in ul.find_all('li')]
        print(f"  List [{list_id}]: {len(items)} items")
        for item in items:
            print(f"    • {item}")

# ── Method 3: KPI cards as structured data ────────────────────────────────────
print("\n" + "=" * 60)
print("METHOD 3: KPI Cards → Structured Data")
print("=" * 60)

kpis = []
for card in soup.find_all('div', class_='kpi-card'):
    kpis.append({
        'metric'  : card.get('data-metric', 'N/A'),
        'label'   : card.find(class_='kpi-label').text.strip(),
        'value'   : card.find(class_='kpi-value').text.strip(),
        'change'  : card.find(class_='kpi-change').text.strip(),
        'trend'   : 'up' if 'up' in card.find(class_='kpi-change').get('class', [])
                    else 'down'
    })

df_kpi = pd.DataFrame(kpis)
print(df_kpi.to_string(index=False))

# ── Method 4: HTML table → DataFrame ─────────────────────────────────────────
print("\n" + "=" * 60)
print("METHOD 4: Product Table → Pandas DataFrame")
print("=" * 60)

table = soup.find('table', id='product-table')

# Extract headers
headers = [th.text.strip() for th in table.select('thead th')]

# Extract rows preserving badge text
rows = []
for tr in table.select('tbody tr'):
    cells = [td.get_text(strip=True) for td in tr.find_all('td')]
    rows.append(cells)

df_products = pd.DataFrame(rows, columns=headers)
print(df_products.to_string(index=False))

# Extra: filter only Active products
print("\n  Active products only:")
active = df_products[df_products['Status'] == 'Active']
print(active[['Product Name', 'Q1 Revenue', 'Growth']].to_string(index=False))

# ── Method 5: Full document structure as dict ─────────────────────────────────
print("\n" + "=" * 60)
print("METHOD 5: Full Structure Summary")
print("=" * 60)

structure = {
    'title'    : soup.title.text.strip(),
    'sections' : [],
}

for section in soup.find_all('section'):
    h2      = section.find('h2')
    lists   = section.find_all(['ul', 'ol'])
    tables  = section.find_all('table')
    paras   = section.find_all('p')

    structure['sections'].append({
        'id'      : section.get('id'),
        'heading' : h2.text.strip() if h2 else None,
        'paragraphs' : len(paras),
        'lists'   : [
            {'id': l.get('id'), 'items': [li.get_text(strip=True)
                                                         for li in l.find_all('li')]}
            for l in lists
        ],
        'tables'  : len(tables),
    })

for s in structure['sections']:
    print(f"\n  [{s['id']}] {s['heading']}")
    print(f"    Paragraphs : {s['paragraphs']}")
    print(f"    Tables     : {s['tables']}")
    for lst in s['lists']:
        print(f"    List [{lst['id']}]: {len(lst['items'])} items →
                                                                {lst['items'][:2]}...")

Output :


Note : We need to use the class called BeautifulSoup from library bs4 to handle HTML files.


Handling JSON files :

import json
import pandas as pd
from datetime import datetime

with open('D:\GenAI Content\AI code\4_RAG_Indexing\1_Document_Processing\Data
\company_data.json', 'r', encoding='utf-8') as f:
    data = json.load(f)

# ── Method 1: Top-level metadata ──────────────────────────────────────────────
print("=" * 60)
print("METHOD 1: Company Info & Metadata")
print("=" * 60)

company = data['company']
print(f"  Company   : {company['name']}")
print(f"  Founded   : {company['founded']}")
print(f"  HQ        : {company['headquarters']}")
print(f"  Industry  : {company['industry']}")
print(f"  Employees : {company['total_employees']}")

meta = data['metadata']
print(f"\n  Version   : {meta['version']}")
print(f"  Created   : {meta['created_at']}")
print(f"  Class.    : {meta['classification']}")

# ── Method 2: Nested structure — departments + employees ─────────────────────
print("\n" + "=" * 60)
print("METHOD 2: Departments with Nested Employees")
print("=" * 60)

for dept in data['departments']:
    print(f"\n  [{dept['dept_id']}] {dept['name']}  |  Head: {dept['head']}  |  
Budget: ${dept['budget_usd']:,}")
    print(f"  {'-' * 52}")
    for emp in dept['employees']:
        print(f"    {emp['emp_id']}{emp['name']:20s} | {emp['role']:25s} |
{emp['status']}")
        print(f"   {'':10s} Skills : {', '.join(emp['skills'])}")
        print(f"   {'':10s} Salary : ${emp['salary_usd']:,} | Joined: {emp['joined']}")
        print(f"   {'':10s} Q1 Score: {emp['performance']['q1_2026']}")
        print(f"   {'':10s} City: {emp['address']['city']}, {emp['address']['state']}")
        print()

# ── Method 3: Flatten employees into a DataFrame ──────────────────────────────
print("=" * 60)
print("METHOD 3: All Employees → Flat DataFrame")
print("=" * 60)

rows = []
for dept in data['departments']:
    for emp in dept['employees']:
        rows.append({
            'emp_id'      : emp['emp_id'],
            'name'        : emp['name'],
            'department'  : dept['name'],
            'role'        : emp['role'],
            'salary_usd'  : emp['salary_usd'],
            'status'      : emp['status'],
            'joined'      : emp['joined'],
            'q1_score'    : emp['performance']['q1_2026'],
            'city'        : emp['address']['city'],
            'skills_count': len(emp['skills']),
        })

df_emp = pd.DataFrame(rows)
print(df_emp.to_string(index=False))

# Aggregation — avg salary by department
print("\n  Average Salary by Department:")
print(df_emp.groupby('department')['salary_usd']
      .mean().apply(lambda x: f"${x:,.0f}").to_string())

# ── Method 4: Products list → DataFrame with filtering ───────────────────────
print("\n" + "=" * 60)
print("METHOD 4: Products → DataFrame with Filtering")
print("=" * 60)

df_prod = pd.DataFrame(data['products'])
print(df_prod.to_string(index=False))

print("\n  Active products only:")
active = df_prod[df_prod['status'] == 'Active']
print(active[['name', 'category', 'price_usd', 'q1_revenue']].to_string(index=False))

print(f"\n  Total Q1 Revenue (Active): ${active['q1_revenue'].sum():,}")

# ── Method 5: Deeply nested access — financials comparison ───────────────────
print("\n" + "=" * 60)
print("METHOD 5: Financials — Quarter-over-Quarter")
print("=" * 60)

financials = data['financials']
for quarter, stats in financials.items():
    print(f"\n  {quarter.upper()}")
    print(f"    Revenue  : ${stats['total_revenue_usd']:,}")
    print(f"    Expenses : ${stats['total_expenses_usd']:,}")
    print(f"    Profit   : ${stats['net_profit_usd']:,}")
    growth_key = [k for k in stats if 'growth' in k][0]
    print(f"    Growth   : {stats[growth_key]}%")

# ── Method 6: Search & filter across nested structure ─────────────────────────
print("\n" + "=" * 60)
print("METHOD 6: Search Across Nested Data")
print("=" * 60)

# Find all employees with a specific skill
search_skill = "Python"
print(f"\n  Employees with skill '{search_skill}':")
for dept in data['departments']:
    for emp in dept['employees']:
        if search_skill in emp['skills']:
            print(f"    → {emp['name']} ({dept['name']})")

# Find all Active employees with Q1 score > 4.5
print(f"\n  High performers (Q1 score > 4.5 & Active):")
for dept in data['departments']:
    for emp in dept['employees']:
        if emp['status'] == 'Active' and emp['performance']['q1_2026'] > 4.5:
            print(f"    → {emp['name']:20s} | Score: {emp['performance']['q1_2026']}
| {dept['name']}")

# ── Method 7: Full structure summary ─────────────────────────────────────────
print("\n" + "=" * 60)
print("METHOD 7: Document Structure Summary")
print("=" * 60)

print(f"  Top-level keys : {list(data.keys())}")
print(f"  Departments    : {len(data['departments'])}")
print(f"  Total Employees: {sum(len(d['employees']) for d in data['departments'])}")
print(f"  Products       : {len(data['products'])}")
print(f"  Financial Qtrs : {len(data['financials'])}")
print(f"  Unique Skills  : {len(set(s for d in data['departments']
for e in d['employees'] for s in e['skills']))}")

Output :



Important Information 

  • Recently Google introduced a parser called Layout Parser
  • It hides the logic but we can upload any file in any format, it will extract the data for us.
  • We don't know what's happening inside, we just need to purchase their API key but it does work for us
  • Explore -  https://docs.cloud.google.com/document-ai/docs/layout-parse-quickstart 

We can simply use above Layout Parser but we can't secure our data.

  • If we need to process confidential data, we need to mask that data first and then extract - In guardrails, we will come to know how to mask that data
  • We need to implement evaluation techniques for all the above file handling mechanisms - means we need to compare the data before and after extraction and it must be same


  • In the above image, during indexing - if we are implementing HNSW indexing technique and we need to implement p99 latency, then we need to implement p99 latency in all the steps like data extraction, chunking, embedding and also during indexing. This is very important technique to understand. We will what are Flat, IVF-PQ & HNSW indexing techniques in the later part of this blog.

Conclusion for Data extraction as part of Indexing :

So far, we have seen how to extract data from multiple source files like PDF, screenshots in PDF format, docx, PPT, xlsx, HTML, JSON etc. as we have clarity on how to extract data from different source types, lets look at the next part of indexing i.e. chunking.


Chunking 

Chunking is the process of splitting large documents into smaller, semantically meaningful units to enable efficient embedding, accurate retrieval, and better context injection in RAG systems. 

Instead of chunking a full document, or a 100+ page PDF, you split into smaller sections such as:

  • Paragraph
  • Sections
  • Sliding windows of text (200 - 500 tokens etc.)


Why do we need chunking ?

Embedding models(like OpenAI embeddings) have input limits. You can't embed very large documents directly. Chunking ensures each piece fits in the context window of AI models. Without chunking, data gets truncated or fails.

  • RAG works by retrieving relevant chunks, not entire document
  • Chunking ensures better semantic matching

As shown in the above image, below are 8 chunking techniques.

1) Fixed-Size chunking 

  • Split text into equal-sized chunks of characters
  • Disadvantage is we might loose the context if we break in the middle of the sentence

Implementation :
import re
import json
import pdfplumber
import pypdf
import tiktoken
from langchain_text_splitters import (
    RecursiveCharacterTextSplitter,
    CharacterTextSplitter,
)
 
PDF_PATH = "D:\GenAI Content\\AI code\\4_RAG_Indexing\\1_Document_Processing
            \\Data\\financial_report_2024.pdf"
 
# ─────────────────────────────────────────────────────────────────────────────
# HELPER: Extract raw text from PDF
# ─────────────────────────────────────────────────────────────────────────────
def extract_text_from_pdf(path: str) -> str:
    text = ""
    with pdfplumber.open(path) as pdf:
        for page in pdf.pages:
            page_text = page.extract_text()
            if page_text:
                text += page_text + "\n"
    return text.strip()
 
 
def print_chunks(chunks: list[dict], strategy: str, max_show: int = 3):
    print(f"\n{'='*65}")
    print(f"  Strategy : {strategy}")
    print(f"  Total    : {len(chunks)} chunks")
    print(f"  Showing  : first {min(max_show, len(chunks))} chunks")
    print(f"{'='*65}")
    for i, chunk in enumerate(chunks[:max_show], 1):
        text = chunk.get("text", "")
        meta = {k: v for k, v in chunk.items() if k != "text"}
        print(f"\n  ── Chunk {i} ──────────────────────────────────────────")
        print(f"  Metadata : {json.dumps(meta, indent=None)}")
        print(f"  Length   : {len(text)} chars")
        print(f"  Text     :\n  {text[:300].strip()}
                                            {'...' if len(text) > 300 else ''}")
    print()
 
 
# ─────────────────────────────────────────────────────────────────────────────
# STEP 0: Load the PDF
# ─────────────────────────────────────────────────────────────────────────────
print("\n" + "="*65)
print("  Loading: financial_report_2024.pdf")
print("="*65)
 
raw_text = extract_text_from_pdf(PDF_PATH)
print(f"  Extracted {len(raw_text)} characters, {len(raw_text.split())} words")
print(f"\n  Preview:\n  {raw_text[:300]}...")


def fixed_size_chunking(text: str, chunk_size: int = 300, overlap: int = 50) ->
                                                                         list[dict]:
    chunks = []
    start = 0
    chunk_index = 0
    while start < len(text):
        end = start + chunk_size
        chunk_text = text[start:end].strip()
        if chunk_text:
            chunks.append({
                "chunk_id"    : f"fixed_{chunk_index}",
                "strategy"    : "fixed_size",
                "chunk_size"  : chunk_size,
                "overlap"     : overlap,
                "start_char"  : start,
                "end_char"    : end,
                "text"        : chunk_text,
            })
            chunk_index += 1
        start += chunk_size - overlap  # slide forward with overlap
    return chunks
 
 
fixed_chunks = fixed_size_chunking(raw_text, chunk_size=300, overlap=50)
print_chunks(fixed_chunks, "1. Fixed-Size Chunking (size=300, overlap=50)")

Output :

Note : Observe how we implemented overlap inside definition fixed_sized_chunking()


2) Recursive character text splitting 
  • Recursive split using separators(newlines, sentences, words)
  • We are retaining the entire context and we can use this in production

Implementation :
import re
import json
import pdfplumber
import pypdf
import tiktoken
from langchain_text_splitters import (
    RecursiveCharacterTextSplitter,
    CharacterTextSplitter,
)
 
PDF_PATH = "D:\GenAI Content\\AI code\\4_RAG_Indexing\\1_Document_Processing
\\Data\\financial_report_2024.pdf"
 
 #
# ─────────────────────────────────────────────────────────────────────────────
# HELPER: Extract raw text from PDF
# ─────────────────────────────────────────────────────────────────────────────
def extract_text_from_pdf(path: str) -> str:
    text = ""
    with pdfplumber.open(path) as pdf:
        for page in pdf.pages:
            page_text = page.extract_text()
            if page_text:
                text += page_text + "\n"
    return text.strip()
 
 
def print_chunks(chunks: list[dict], strategy: str, max_show: int = 3):
    print(f"\n{'='*65}")
    print(f"  Strategy : {strategy}")
    print(f"  Total    : {len(chunks)} chunks")
    print(f"  Showing  : first {min(max_show, len(chunks))} chunks")
    print(f"{'='*65}")
    for i, chunk in enumerate(chunks[:max_show], 1):
        text = chunk.get("text", "")
        meta = {k: v for k, v in chunk.items() if k != "text"}
        print(f"\n  ── Chunk {i} ──────────────────────────────────────────")
        print(f"  Metadata : {json.dumps(meta, indent=None)}")
        print(f"  Length   : {len(text)} chars")
        print(f"  Text     :\n  {text[:300].strip()}
                                                {'...' if len(text) > 300 else ''}")
    print()
 
 
# ─────────────────────────────────────────────────────────────────────────────
# STEP 0: Load the PDF
# ─────────────────────────────────────────────────────────────────────────────
print("\n" + "="*65)
print("  Loading: financial_report_2024.pdf")
print("="*65)
 
raw_text = extract_text_from_pdf(PDF_PATH)
print(f"  Extracted {len(raw_text)} characters, {len(raw_text.split())} words")
print(f"\n  Preview:\n  {raw_text[:300]}...")

def recursive_character_chunking(text: str, chunk_size: int = 400, overlap: int = 80)
-> list[dict]:
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=chunk_size,
        chunk_overlap=overlap,
        separators=["\n\n", "\n", ". ", " ", ""],
        length_function=len,
    )
    raw_chunks = splitter.split_text(text)
    return [
        {
            "chunk_id"  : f"recursive_{i}",
            "strategy"  : "recursive_character",
            "chunk_size": chunk_size,
            "overlap"   : overlap,
            "text"      : chunk.strip(),
        }
        for i, chunk in enumerate(raw_chunks) if chunk.strip()
    ]
 
 
recursive_chunks = recursive_character_chunking(raw_text, chunk_size=400, overlap=80)
print_chunks(recursive_chunks,
"2. Recursive Character Text Splitting (size=400, overlap=80)")

Output :

Simply, we are chunking based on mentioned separators in the code. Separators could be anything, depends on the context in the text. 


3) Semantic chunking (Manual + LLM)
  • Split text into meaningful sections based on topics/headers
  • This is another recommended chunking technique in production
  • Disadvantage is all chunks are not in same size
  • If we deal with millions of documents, it is hard to identify headers in the documents
  • Another way of handling this technique is by using LLM - it will take care of chunking based on headers thought we deal millions of documents, but if we use LLM, we need to pay for tokens
  • Incase if all the documents are of same structure, then Semantic chunking is the best way in production

Implementation (using LLM) :

    response = llm.invoke([
        SystemMessage(content="You are a document chunking expert."),
        HumanMessage(content=prompt)
    ])

    content = response.content

    # Parse JSON safely
    try:
        parsed_chunks = json.loads(content)
    except json.JSONDecodeError:
        print("⚠️ JSON parsing failed. Raw response:", content)
        return []

    # Add metadata (same as your original logic)
    chunks = []
    for i, chunk in enumerate(parsed_chunks):
        chunks.append({
            "chunk_id": f"llm_section_{i}",
            "strategy": "llm_semantic",
            "section": chunk.get("section", "Unknown"),
            "word_count": len(chunk.get("text", "").split()),
            "text": chunk.get("text", "").strip(),
        })

    return chunks
semantic_chunks = llm_semantic_chunking(raw_text)
print_chunks(semantic_chunks, "3. Semantic / Section-Aware Chunking", max_show=5)

Output :



4) Sentence-Level chunking
  • Group of fixed number of sentences per chunk with optional overlap
  • Note overlap is optional here

Implementation :
import re
import json
import pdfplumber
import pypdf
import tiktoken
from langchain_text_splitters import (
    RecursiveCharacterTextSplitter,
    CharacterTextSplitter,
)
 
PDF_PATH = "D:\GenAI Content\\AI code\\4_RAG_Indexing\\1_Document_Processing
\\Data\\financial_report_2024.pdf"
 
# ─────────────────────────────────────────────────────────────────────────────
# HELPER: Extract raw text from PDF
# ─────────────────────────────────────────────────────────────────────────────
def extract_text_from_pdf(path: str) -> str:
    text = ""
    with pdfplumber.open(path) as pdf:
        for page in pdf.pages:
            page_text = page.extract_text()
            if page_text:
                text += page_text + "\n"
    return text.strip()
 
 
def print_chunks(chunks: list[dict], strategy: str, max_show: int = 3):
    print(f"\n{'='*65}")
    print(f"  Strategy : {strategy}")
    print(f"  Total    : {len(chunks)} chunks")
    print(f"  Showing  : first {min(max_show, len(chunks))} chunks")
    print(f"{'='*65}")
    for i, chunk in enumerate(chunks[:max_show], 1):
        text = chunk.get("text", "")
        meta = {k: v for k, v in chunk.items() if k != "text"}
        print(f"\n  ── Chunk {i} ──────────────────────────────────────────")
        print(f"  Metadata : {json.dumps(meta, indent=None)}")
        print(f"  Length   : {len(text)} chars")
        print(f"  Text     :\n  {text[:300].strip()}{'...'
                                                if len(text) > 300 else ''}")
    print()
 
 
# ─────────────────────────────────────────────────────────────────────────────
# STEP 0: Load the PDF
# ─────────────────────────────────────────────────────────────────────────────
print("\n" + "="*65)
print("  Loading: financial_report_2024.pdf")
print("="*65)
 
raw_text = extract_text_from_pdf(PDF_PATH)
print(f"  Extracted {len(raw_text)} characters, {len(raw_text.split())} words")
print(f"\n  Preview:\n  {raw_text[:300]}...")


def sentence_chunking(text: str, sentences_per_chunk: int = 3, overlap: int = 1)
                                                                        -> list[dict]:
    # Basic sentence splitter (works well for clean PDF text)
    raw_sentences = re.split(r'(?<=[.!?])\s+', text)
    sentences = [s.strip() for s in raw_sentences if len(s.strip()) > 20]
 
    chunks = []
    step = sentences_per_chunk - overlap
    for i in range(0, len(sentences), step):
        group = sentences[i : i + sentences_per_chunk]
        if not group:
            continue
        chunk_text = " ".join(group)
        chunks.append({
            "chunk_id"         : f"sentence_{len(chunks)}",
            "strategy"         : "sentence_level",
            "sentences_per_chunk": sentences_per_chunk,
            "sentence_start"   : i,
            "sentence_end"     : i + len(group),
            "text"             : chunk_text,
        })
    return chunks
 
 
sentence_chunks = sentence_chunking(raw_text, sentences_per_chunk=3, overlap=1)
print_chunks(sentence_chunks, "4. Sentence-Level Chunking (3 sentences, overlap=1)")

Output :



5) Token-Based chunking
  • Split test into chunks based on the number of tokens (model tokenizer)
  • Note overlap is optional here

Implementation :
import re
import json
import pdfplumber
import pypdf
import tiktoken
from langchain_text_splitters import (
    RecursiveCharacterTextSplitter,
    CharacterTextSplitter,
)
 
PDF_PATH = "D:\GenAI Content\\AI code\\4_RAG_Indexing\\1_Document_Processing\\Data\\financial_report_2024.pdf"
 
# ─────────────────────────────────────────────────────────────────────────────
# HELPER: Extract raw text from PDF
# ─────────────────────────────────────────────────────────────────────────────
def extract_text_from_pdf(path: str) -> str:
    text = ""
    with pdfplumber.open(path) as pdf:
        for page in pdf.pages:
            page_text = page.extract_text()
            if page_text:
                text += page_text + "\n"
    return text.strip()
 
 
def print_chunks(chunks: list[dict], strategy: str, max_show: int = 3):
    print(f"\n{'='*65}")
    print(f"  Strategy : {strategy}")
    print(f"  Total    : {len(chunks)} chunks")
    print(f"  Showing  : first {min(max_show, len(chunks))} chunks")
    print(f"{'='*65}")
    for i, chunk in enumerate(chunks[:max_show], 1):
        text = chunk.get("text", "")
        meta = {k: v for k, v in chunk.items() if k != "text"}
        print(f"\n  ── Chunk {i} ──────────────────────────────────────────")
        print(f"  Metadata : {json.dumps(meta, indent=None)}")
        print(f"  Length   : {len(text)} chars")
        print(f"  Text     :\n  {text[:300].strip()}{'...' if len(text) > 300 else ''}")
    print()
 
 
# ─────────────────────────────────────────────────────────────────────────────
# STEP 0: Load the PDF
# ─────────────────────────────────────────────────────────────────────────────
print("\n" + "="*65)
print("  Loading: financial_report_2024.pdf")
print("="*65)
 
raw_text = extract_text_from_pdf(PDF_PATH)
print(f"  Extracted {len(raw_text)} characters, {len(raw_text.split())} words")
print(f"\n  Preview:\n  {raw_text[:300]}...")

# ─────────────────────────────────────────────────────────────────────────────
# STRATEGY 5: Token-Based Chunking
# ─────────────────────────────────────────────────────────────────────────────
# Best for: LLM APIs with token limits (OpenAI, Claude, etc.)
# Logic:    Count tokens via tiktoken; never exceed model context window
# ─────────────────────────────────────────────────────────────────────────────
def token_based_chunking(text: str, max_tokens: int = 150, overlap_tokens: int = 20) -> list[dict]:
    # Initialize tokenizer (same used by GPT-4 / GPT-4.1 family)
    encoding = tiktoken.get_encoding("cl100k_base")

    # Convert text → token IDs
    tokens = encoding.encode(text)

    chunks = []
    start = 0

    while start < len(tokens):
        end = min(start + max_tokens, len(tokens))

        # Slice tokens
        token_slice = tokens[start:end]

        # Convert tokens back → text
        chunk_text = encoding.decode(token_slice).strip()

        if chunk_text:
            chunks.append({
                "chunk_id"     : f"token_{len(chunks)}",
                "strategy"     : "token_based",
                "max_tokens"   : max_tokens,
                "actual_tokens": len(token_slice),
                "token_start"  : start,
                "token_end"    : end,
                "text"         : chunk_text,
            })

        # Move forward with overlap
        start += max_tokens - overlap_tokens

    return chunks

token_chunks = token_based_chunking(raw_text, max_tokens=150, overlap_tokens=20)
print_chunks(token_chunks, "5. Token-Based Chunking (max=150 tokens, overlap=20)")

Output :



6) Sliding-Window chunking (Overlap is mandatory)
  • Use a window of fixed size that slides by a step, creating overlapping chunks
  • Overlap is not optional here, it is mandatory in this technique

Implementation :
import re
import json
import pdfplumber
import pypdf
import tiktoken
from langchain_text_splitters import (
    RecursiveCharacterTextSplitter,
    CharacterTextSplitter,
)
 
PDF_PATH = "D:\GenAI Content\\AI code\\4_RAG_Indexing\\1_Document_Processing
\\Data\\financial_report_2024.pdf"
 
# ─────────────────────────────────────────────────────────────────────────────
# HELPER: Extract raw text from PDF
# ─────────────────────────────────────────────────────────────────────────────
def extract_text_from_pdf(path: str) -> str:
    text = ""
    with pdfplumber.open(path) as pdf:
        for page in pdf.pages:
            page_text = page.extract_text()
            if page_text:
                text += page_text + "\n"
    return text.strip()
 
 
def print_chunks(chunks: list[dict], strategy: str, max_show: int = 3):
    print(f"\n{'='*65}")
    print(f"  Strategy : {strategy}")
    print(f"  Total    : {len(chunks)} chunks")
    print(f"  Showing  : first {min(max_show, len(chunks))} chunks")
    print(f"{'='*65}")
    for i, chunk in enumerate(chunks[:max_show], 1):
        text = chunk.get("text", "")
        meta = {k: v for k, v in chunk.items() if k != "text"}
        print(f"\n  ── Chunk {i} ──────────────────────────────────────────")
        print(f"  Metadata : {json.dumps(meta, indent=None)}")
        print(f"  Length   : {len(text)} chars")
        print(f"  Text     :\n  {text[:300].strip()}{'...' if len(text) >
                                                                     300 else ''}")
    print()
 
 
# ─────────────────────────────────────────────────────────────────────────────
# STEP 0: Load the PDF
# ─────────────────────────────────────────────────────────────────────────────
print("\n" + "="*65)
print("  Loading: financial_report_2024.pdf")
print("="*65)
 
raw_text = extract_text_from_pdf(PDF_PATH)
print(f"  Extracted {len(raw_text)} characters, {len(raw_text.split())} words")
print(f"\n  Preview:\n  {raw_text[:300]}...")


def sliding_window_chunking(text: str, window_size: int = 400, step_size: int = 100)
                                                                     -> list[dict]:
    words = text.split()
    chunks = []
    for i in range(0, len(words), step_size):
        window_words = words[i : i + window_size]
        if len(window_words) < 20:  # skip tiny trailing chunks
            break
        chunk_text = " ".join(window_words)
        chunks.append({
            "chunk_id"   : f"window_{len(chunks)}",
            "strategy"   : "sliding_window",
            "window_size": window_size,
            "step_size"  : step_size,
            "word_start" : i,
            "word_end"   : i + len(window_words),
            "overlap_pct": round((1 - step_size / window_size) * 100, 1),
            "text"       : chunk_text,
        })
    return chunks
 
 
window_chunks = sliding_window_chunking(raw_text, window_size=120, step_size=40)
print_chunks(window_chunks, "6. Sliding Window Chunking (window=120w, step=40w,
                                                                        67% overlap)")

Output :




7) Table-Aware chunking
  • Keep tables as separate chunks and rest of the context as separate chunks 

Implementation :
import re
import json
import pdfplumber
import pypdf
import tiktoken
from langchain_text_splitters import (
    RecursiveCharacterTextSplitter,
    CharacterTextSplitter,
)
 
PDF_PATH = "D:\GenAI Content\\AI code\\4_RAG_Indexing\\1_Document_Processing\\Data\\financial_report_2024.pdf"
 
# ─────────────────────────────────────────────────────────────────────────────
# HELPER: Extract raw text from PDF
# ─────────────────────────────────────────────────────────────────────────────
def extract_text_from_pdf(path: str) -> str:
    text = ""
    with pdfplumber.open(path) as pdf:
        for page in pdf.pages:
            page_text = page.extract_text()
            if page_text:
                text += page_text + "\n"
    return text.strip()
 
 
def print_chunks(chunks: list[dict], strategy: str, max_show: int = 3):
    print(f"\n{'='*65}")
    print(f"  Strategy : {strategy}")
    print(f"  Total    : {len(chunks)} chunks")
    print(f"  Showing  : first {min(max_show, len(chunks))} chunks")
    print(f"{'='*65}")
    for i, chunk in enumerate(chunks[:max_show], 1):
        text = chunk.get("text", "")
        meta = {k: v for k, v in chunk.items() if k != "text"}
        print(f"\n  ── Chunk {i} ──────────────────────────────────────────")
        print(f"  Metadata : {json.dumps(meta, indent=None)}")
        print(f"  Length   : {len(text)} chars")
        print(f"  Text     :\n  {text[:300].strip()}{'...' if len(text) > 300 else ''}")
    print()
 
 
# ─────────────────────────────────────────────────────────────────────────────
# STEP 0: Load the PDF
# ─────────────────────────────────────────────────────────────────────────────
print("\n" + "="*65)
print("  Loading: financial_report_2024.pdf")
print("="*65)
 
raw_text = extract_text_from_pdf(PDF_PATH)
print(f"  Extracted {len(raw_text)} characters, {len(raw_text.split())} words")
print(f"\n  Preview:\n  {raw_text[:300]}...")


def table_aware_chunking(text: str) -> list[dict]:
    lines = text.split("\n")
    chunks = []
    prose_buffer = []
    table_buffer = []
 
    def flush_prose():
        if prose_buffer:
            prose_text = " ".join(" ".join(prose_buffer).split())
            if prose_text.strip():
                chunks.append({
                    "chunk_id" : f"prose_{len(chunks)}",
                    "strategy" : "table_aware",
                    "type"     : "prose",
                    "text"     : prose_text,
                })
            prose_buffer.clear()
 
    def flush_table():
        if table_buffer:
            table_text = "\n".join(table_buffer)
            if table_text.strip():
                chunks.append({
                    "chunk_id" : f"table_{len(chunks)}",
                    "strategy" : "table_aware",
                    "type"     : "table",
                    "rows"     : len(table_buffer),
                    "text"     : table_text,
                })
            table_buffer.clear()
 
    def is_table_row(line: str) -> bool:
        # Table rows have multiple large whitespace gaps (tabular layout)
        return bool(re.search(r'\s{3,}', line)) and len(line.strip()) > 10
 
    for line in lines:
        if is_table_row(line):
            flush_prose()
            table_buffer.append(line)
        else:
            flush_table()
            if line.strip():
                prose_buffer.append(line.strip())
 
    flush_prose()
    flush_table()
    return chunks
 
 
table_chunks = table_aware_chunking(raw_text)
print_chunks(table_chunks, "7. Table-Aware Chunking", max_show=6)

Output :



8) Parent-Child chunking
  • Create large parent chunks(sections) and small child chunks inside them
  • Observe that for each section, it created a parent chunks and multiple child chunks

Implementation :
import re
import json
import pdfplumber
import pypdf
import tiktoken
from langchain_text_splitters import (
    RecursiveCharacterTextSplitter,
    CharacterTextSplitter,
)
 
PDF_PATH = "D:\GenAI Content\\AI code\\4_RAG_Indexing\\1_Document_Processing\\Data\\financial_report_2024.pdf"
 
# ─────────────────────────────────────────────────────────────────────────────
# HELPER: Extract raw text from PDF
# ─────────────────────────────────────────────────────────────────────────────
def extract_text_from_pdf(path: str) -> str:
    text = ""
    with pdfplumber.open(path) as pdf:
        for page in pdf.pages:
            page_text = page.extract_text()
            if page_text:
                text += page_text + "\n"
    return text.strip()
 
 
def print_chunks(chunks: list[dict], strategy: str, max_show: int = 3):
    print(f"\n{'='*65}")
    print(f"  Strategy : {strategy}")
    print(f"  Total    : {len(chunks)} chunks")
    print(f"  Showing  : first {min(max_show, len(chunks))} chunks")
    print(f"{'='*65}")
    for i, chunk in enumerate(chunks[:max_show], 1):
        text = chunk.get("text", "")
        meta = {k: v for k, v in chunk.items() if k != "text"}
        print(f"\n  ── Chunk {i} ──────────────────────────────────────────")
        print(f"  Metadata : {json.dumps(meta, indent=None)}")
        print(f"  Length   : {len(text)} chars")
        print(f"  Text     :\n  {text[:300].strip()}{'...' if len(text) > 300 else ''}")
    print()
 
 
# ─────────────────────────────────────────────────────────────────────────────
# STEP 0: Load the PDF
# ─────────────────────────────────────────────────────────────────────────────
print("\n" + "="*65)
print("  Loading: financial_report_2024.pdf")
print("="*65)
 
raw_text = extract_text_from_pdf(PDF_PATH)
print(f"  Extracted {len(raw_text)} characters, {len(raw_text.split())} words")
print(f"\n  Preview:\n  {raw_text[:300]}...")

# ─────────────────────────────────────────────────────────────────────────────
# STRATEGY 3: Semantic / Section-Aware Chunking
# ─────────────────────────────────────────────────────────────────────────────
# Best for: Documents with clear headings (reports, contracts, manuals)
# Logic:    Detect section headers via regex → split on them
# ─────────────────────────────────────────────────────────────────────────────
def semantic_section_chunking(text: str) -> list[dict]:
    # Known section headers in this financial report
    section_pattern = re.compile(
        r"(?m)^(Executive Summary|Revenue Growth|Key Metrics|Revenue by Segment|FY 2025 Outlook)\s*$"
    )
    splits = section_pattern.split(text)
 
    chunks = []
    section_name = "Preamble"
    for part in splits:
        part = part.strip()
        if not part:
            continue
        if section_pattern.match(part):
            section_name = part
        else:
            chunks.append({
                "chunk_id"   : f"section_{len(chunks)}",
                "strategy"   : "semantic_section",
                "section"    : section_name,
                "word_count" : len(part.split()),
                "text"       : part,
            })
    return chunks


def hierarchical_chunking(text: str, child_size: int = 150) -> list[dict]:
    # Reuse semantic sections as parents
    parents = semantic_section_chunking(text)
 
    all_chunks = []
    for parent in parents:
        parent_id = parent["chunk_id"].replace("section_", "parent_")
 
        # Store parent
        all_chunks.append({
            "chunk_id"  : parent_id,
            "strategy"  : "hierarchical",
            "level"     : "parent",
            "section"   : parent["section"],
            "text"      : parent["text"],
        })
 
        # Split parent into smaller child chunks
        splitter = RecursiveCharacterTextSplitter(
            chunk_size=child_size,
            chunk_overlap=30,
            separators=["\n", ". ", " "],
        )
        children = splitter.split_text(parent["text"])
        for j, child_text in enumerate(children):
            if child_text.strip():
                all_chunks.append({
                    "chunk_id"   : f"{parent_id}_child_{j}",
                    "strategy"   : "hierarchical",
                    "level"      : "child",
                    "parent_id"  : parent_id,
                    "section"    : parent["section"],
                    "text"       : child_text.strip(),
                })
    return all_chunks
 
 
hierarchical_chunks = hierarchical_chunking(raw_text, child_size=150)
parents  = [c for c in hierarchical_chunks if c["level"] == "parent"]
children = [c for c in hierarchical_chunks if c["level"] == "child"]
 
print(f"\n{'='*65}")
print(f"  Strategy : 8. Hierarchical / Parent-Child Chunking")
print(f"  Parents  : {len(parents)} | Children: {len(children)} | Total: {len(hierarchical_chunks)}")
print(f"{'='*65}")
for p in parents:
    kids = [c for c in children if c["parent_id"] == p["chunk_id"]]
    print(f"\n  PARENT [{p['chunk_id']}] — Section: {p['section']}")
    print(f"  Parent text ({len(p['text'])} chars): {p['text'][:120]}...")
    print(f"  Children ({len(kids)}):")
    for k in kids[:2]:
        print(f"    └─ [{k['chunk_id']}] {k['text'][:100]}...")


Output :



To conclude :
  • Recursive character text splitting
  • Semantic chunking
  • Parent-Child chunking 
are the 3 main and mostly used chunking techniques in production but if we have a scenario where we can go with other techniques, then we can still use other techniques as well.

Conclusion for Chunking as part of Indexing :

We are done with chunking strategies. Out of all 8 techniques, Semantic, Recursive character text splitter and Parent-Child are production recommend strategies. We can use LLM calls for chunking as well by creating proper prompt for above chunking strategies but remember it involve cost.



Remaining topics in this blog :
  • Embeddings & Cost
  • Vector Store Vs Vector DB
  • Indexing Mechanism 
    • Flat Indexing
    • IVF-PQ Indexing
    • HNSW Indexing
  • Meta Data Filtering


Embeddings 

Embeddings are numerical vector representations of text that capture the meaning and context of content. When you split documents into chunks during indexing, each chunk is converted into a list of numbers(vector), so that machines can understand the semantic meaning of text.

Embeddings might look like : [0.21, -0.45, 0.88, 0.13, ...] - Usually hundreds and thousands of dimensions

Real Example : 

Document chunk: "AWS S3 provide object storage"

User ask: "Where can I store files in AWS"

Even though words differ, embeddings place them near each other in vector space.


Very Important point to remember :

Please note that cost factor will start from this point. Because we need to use LLM for converting chunks into embeddings.


Important concept to remember regarding how we store embeddings in Vector store/DB :

  • Consider a statement - "AI is powerful system"
  • When a LLM process this line, it converts each character/word/sub-word into a token and assign a token ID based on the vocabulary of the model
  • Then those assigned token ID's are converted into corresponding embeddings (after going through the training process as part of neural network)
  • For each token ID, there will be a embedding vector of 'n' dimensions, where 'n' depends on the model
    • For text-embedding-3-small - 1536 dimensions
    • For text-embedding-3-large - 3072 dimensions
    • These dimensions are related to whole input text, not the token embedding size inside GPT LLM layers. GPT models won't reveal about internal embedding structure.
  • Till here, we are talking about the dimension of a embedding vector per token
  • But while storing these embedding vector values in Vector store/DB - we store the embeddings per chunk (NOT per word). This is called Aggregated or text level Semantic embedding values

Our providers like OpenAI, Claude, Google Gemini will decide thee text level semantic embeddings.


Available Embeddings :

  • Open AI Embeddings - Closed source and recommended for production
  • Hugging Face Embeddings
  • Open source embeddings


Let us assume, what will happen if the LLM model which we are using is not trained properly with some sort of domain data, say it is not trained with Banking domain related data. Then it won't users data into embeddings properly which will result in hallucinated or incorrect results. An experienced AI developer realize this during validation process of data extraction. A trained AI engineer might not get this assuming problem with model. This is one of the important point to understand. 

That's the reason people already started thinking towards developing domain specific models. If time permits, look at below white papers: 

  • https://arxiv.org/pdf/2409.18511v3 
  • https://huggingface.co/blog/nvidia/domain-specific-embedding-finetune 


Implementation of Embeddings :

"""
============================================================
Production RAG Embeddings — Strategy 8: Hierarchical Chunking
Input : financial_report_2024.pdf
Models: OpenAI text-embedding-3-small / text-embedding-3-large
        HuggingFace all-MiniLM-L6-v2
        BGE BAAI/bge-large-en-v1.5
============================================================
"""

import re
import json, os
import time
import pdfplumber
import numpy as np
from dataclasses import dataclass, field, asdict
from typing import Optional
from langchain_openai import ChatOpenAI
from langchain_text_splitters import RecursiveCharacterTextSplitter
from dotenv import load_dotenv
load_dotenv()
os.environ["OPENAI_API_KEY"] = os.getenv("OPENAI_API_KEY")

llm = ChatOpenAI(
    model="gpt-4o",
    temperature=0.1        # Low temp for deterministic zero-shot
   
)

PDF_PATH   = "D:\GenAI Content\\AI code\\4_RAG_Indexing\\1_Document_Processing\\Data\\financial_report_2024.pdf"
       # ← replace with real key

# ─────────────────────────────────────────────────────────────────────────────
# MODEL REGISTRY  (accurate as of April 2026)
# ─────────────────────────────────────────────────────────────────────────────
EMBEDDING_MODELS = {
    # ── OpenAI ──────────────────────────────────────────────────────────────
    "openai/text-embedding-3-small": {
        "provider"       : "OpenAI",
        "dimensions"     : 1536,
        "max_tokens"     : 8191,
        "cost_per_1k_tok": 0.00002,        # $0.020 per 1M tokens
        "speed"          : "Fast (API)",
        "quality"        : "★★★★☆",
        "best_for"       : "Cost-efficient production RAG",
        "requires_api"   : True,
    },
    "openai/text-embedding-3-large": {
        "provider"       : "OpenAI",
        "dimensions"     : 3072,
        "max_tokens"     : 8191,
        "cost_per_1k_tok": 0.00013,        # $0.130 per 1M tokens
        "speed"          : "Medium (API)",
        "quality"        : "★★★★★",
        "best_for"       : "High-accuracy retrieval, legal/financial docs",
        "requires_api"   : True,
    },
    # ── HuggingFace ──────────────────────────────────────────────────────────
    "huggingface/all-MiniLM-L6-v2": {
        "provider"       : "HuggingFace",
        "dimensions"     : 384,
        "max_tokens"     : 512,
        "cost_per_1k_tok": 0.0,            # Free, runs locally
        "speed"          : "Very Fast (local CPU/GPU)",
        "quality"        : "★★★☆☆",
        "best_for"       : "Lightweight, free local embeddings",
        "requires_api"   : False,
    },
    "huggingface/all-mpnet-base-v2": {
        "provider"       : "HuggingFace",
        "dimensions"     : 768,
        "max_tokens"     : 514,
        "cost_per_1k_tok": 0.0,
        "speed"          : "Fast (local CPU/GPU)",
        "quality"        : "★★★★☆",
        "best_for"       : "Higher quality free local embeddings",
        "requires_api"   : False,
    },
    # ── BGE (Beijing Academy of AI) ──────────────────────────────────────────
    "bge/bge-small-en-v1.5": {
        "provider"       : "BGE (BAAI)",
        "dimensions"     : 384,
        "max_tokens"     : 512,
        "cost_per_1k_tok": 0.0,
        "speed"          : "Very Fast (local)",
        "quality"        : "★★★★☆",
        "best_for"       : "Best free small model; beats MiniLM on MTEB",
        "requires_api"   : False,
    },
    "bge/bge-large-en-v1.5": {
        "provider"       : "BGE (BAAI)",
        "dimensions"     : 1024,
        "max_tokens"     : 512,
        "cost_per_1k_tok": 0.0,
        "speed"          : "Medium (local GPU recommended)",
        "quality"        : "★★★★★",
        "best_for"       : "Best open-source model; near OpenAI quality",
        "requires_api"   : False,
    },
}

"""
Data classes module is a python built-in utility, helps create classes mainly used to store data

Example :

@dataclass
class Employee:
    name: Str
    age: int

Usage:
e = Employee("Arun", 36)
print(e)

Output:
Employee(name="Arun", age=36 )


Also, @dataclass is a decorator which automatically geenrate methods like
__init__()
__repr__()
__eq__()

from dataclasses import dataclass, field, asdict

field:  Used to customize attributes:
=====

from dataclasses import dataclass, field

@dataclass
class Team:
    members: list = field(default_factory=list)

This safely creates a new list for each pbject.

asdict: converts a dataclass object into a dictionary
======

Example :
from dataclasses import asdict

e = Employee("Arun", 36)
print(asdict(e))

Output:
{'name': 'Arun', 'age': 36}


@dataclass automatically creates:
__init__()
__repr__()
__eq__()

So you don’t need to manually write constructors.


"""

# ─────────────────────────────────────────────────────────────────────────────
# DATA CLASSES
# ─────────────────────────────────────────────────────────────────────────────
@dataclass
class Chunk:
    chunk_id  : str
    level     : str          # "parent" or "child"
    section   : str
    parent_id : Optional[str]
    text      : str
    word_count: int = 0

    # __post_init__() runs automatiaally after __init__() in a data class
    # Used if you want extra logic after object creation
    def __post_init__(self):
        self.word_count = len(self.text.split())

@dataclass
class EmbeddingResult:
    chunk_id   : str
    model      : str
    provider   : str
    dimensions : int
    vector     : list[float]
    token_count: int
    cost_usd   : float
    latency_ms : float


# ─────────────────────────────────────────────────────────────────────────────
# STEP 1 — Extract PDF text
# ─────────────────────────────────────────────────────────────────────────────
def extract_text(path: str) -> str:
    text = ""
    with pdfplumber.open(path) as pdf:
        for page in pdf.pages:
            t = page.extract_text()
            if t:
                text += t + "\n"
    return text.strip()


# ─────────────────────────────────────────────────────────────────────────────
# STEP 2 — Hierarchical Chunking (Strategy 8)
# ─────────────────────────────────────────────────────────────────────────────
def semantic_section_chunking(text: str) -> list[dict]:
    pattern = re.compile(
        r"(?m)^(Executive Summary|Revenue Growth|Key Metrics|Revenue by Segment|FY 2025 Outlook)\s*$"
    )
    splits = pattern.split(text)
    chunks, section_name = [], "Preamble"
    for part in splits:
        part = part.strip()
        if not part:
            continue
        if pattern.match(part):
            section_name = part
        else:
            chunks.append({"section": section_name, "text": part})
    return chunks


def hierarchical_chunking(text: str, child_size: int = 200) -> list[Chunk]:
    parents  = semantic_section_chunking(text)
    splitter = RecursiveCharacterTextSplitter(
        chunk_size=child_size, chunk_overlap=40,
        separators=["\n\n", "\n", ". ", " "],
    )
    all_chunks = []
    for i, p in enumerate(parents):
        parent_id = f"parent_{i}"
        all_chunks.append(Chunk(
            chunk_id=parent_id, level="parent",
            section=p["section"], parent_id=None, text=p["text"]
        ))
        for j, child_text in enumerate(splitter.split_text(p["text"])):
            if child_text.strip():
                all_chunks.append(Chunk(
                    chunk_id=f"{parent_id}_child_{j}", level="child",
                    section=p["section"], parent_id=parent_id,
                    text=child_text.strip()
                ))
    return all_chunks


# ─────────────────────────────────────────────────────────────────────────────
# STEP 3 — Token counting (word-based approximation)
# ─────────────────────────────────────────────────────────────────────────────
# Step 1: Split text into words & punctuation
# re.findall(r"\w+|[^\w\s]", text)

# 👉 This regex extracts:

# Words (\w+)
# Punctuation ([^\w\s])
# 📘 Example
# Input:
# "AI is powerful."
# Step 1: Token-like split
# ["AI", "is", "powerful", "."]

# 👉 Count = 4

# Step 2: Multiply by 1.3
# 4 * 1.3 = 5.2
# Step 3: Convert to int
# int(5.2) = 5 tokens (approx)
# 🎯 Why 1.3?

# 👉 Because:

# LLM tokens ≠ words
# On average:
# 1 word ≈ 1.3 tokens
def approx_token_count(text: str) -> int:
    """~1.3 tokens per word — standard GPT/BERT approximation."""
    return int(len(re.findall(r"\w+|[^\w\s]", text)) * 1.3)


# Step 1: Get cost per 1000 tokens

# Example:

# EMBEDDING_MODELS = {
#     "openai": {"cost_per_1k_tok": 0.0001}
# }
# Step 2: Convert tokens → cost

# Formula:

# cost = (token_count / 1000) * cost_per_1k
# 📘 Example
# Input:
# token_count = 500
# model_key = "openai"
# cost_per_1k = 0.0001
# Calculation:
# (500 / 1000) * 0.0001 = 0.00005
# Final Output:
# 0.00005

# 👉 Very small cost (as expected for embeddings)

def estimate_cost(token_count: int, model_key: str) -> float:
    cost_per_1k = EMBEDDING_MODELS[model_key]["cost_per_1k_tok"]
    return round((token_count / 1000) * cost_per_1k, 8)


# ─────────────────────────────────────────────────────────────────────────────
# STEP 4 — Embedding providers
# ─────────────────────────────────────────────────────────────────────────────

# ── 4a. OpenAI ────────────────────────────────────────────────────────────────
# 🔹 Step 1: Initialize OpenAI Client
# import openai
# client = openai.OpenAI(api_key=os.environ["OPENAI_API_KEY"])

# 👉 Uses your API key to connect to OpenAI

# 🔹 Step 2: Prepare Metadata
# model_key = f"openai/{model}"
# dims = EMBEDDING_MODELS[model_key]["dimensions"]

# 👉 Example:

# model = "text-embedding-3-small"
# dims = 1536

# 🔹 Step 3: Extract Text from Chunks
# texts = [c.text for c in chunks]
# Example:
# chunks = [
#   {"chunk_id": "c1", "text": "AI improves productivity"},
#   {"chunk_id": "c2", "text": "Machine learning enables automation"}
# ]

# 👉 Result:

# texts = [
#   "AI improves productivity",
#   "Machine learning enables automation"
# ]
# 🔹 Step 4: Call OpenAI Embedding API
# response = client.embeddings.create(
#     model=model,
#     input=texts
# )

# 👉 OpenAI returns:

# response.data = [
#   {"embedding": [0.12, 0.45, ...]},
#   {"embedding": [0.98, 0.33, ...]}
# ]
# 🔹 Step 5: Measure Latency
# t0 = time.time()
# latency = (time.time() - t0) * 1000

# 👉 Measures how long API call took (in ms)

# 🔹 Step 6: Loop Through Results
# for chunk, emb_obj in zip(chunks, response.data):

# 👉 Matches:

# each chunk
# with its embedding

# 🔹 Step 7: Token Count
# tok = approx_token_count(chunk.text)

# 👉 Example:

# "AI improves productivity" → ~5 tokens

# 🔹 Step 8: Cost Calculation
# cost_usd = estimate_cost(tok, model_key)

# 👉 Example:

# 5 tokens → very small cost (~0.0000005)

# 🔹 Step 9: Create Result Object
# EmbeddingResult(
#     chunk_id   = chunk.chunk_id,
#     model      = model,
#     provider   = "OpenAI",
#     dimensions = dims,
#     vector     = emb_obj.embedding,
#     token_count= tok,
#     cost_usd   = cost,
#     latency_ms = latency / len(chunks),
# )
# 📦 Final Output Example
# [
#   {
#     "chunk_id": "c1",
#     "vector": [0.12, 0.45, ...],
#     "token_count": 5,
#     "cost_usd": 0.0000005,
#     "latency_ms": 20
#   },
#   {
#     "chunk_id": "c2",
#     "vector": [0.98, 0.33, ...],
#     "token_count": 6,
#     "cost_usd": 0.0000006,
#     "latency_ms": 20
#   }
# ]

def embed_openai(chunks: list[Chunk], model: str = "text-embedding-3-small") -> list[EmbeddingResult]:
    """
    Real OpenAI embedding call.
    Replace OPENAI_KEY with your actual key to get live vectors.
    Docs: https://platform.openai.com/docs/guides/embeddings
    """
    try:
        import openai
        client = openai.OpenAI(api_key=os.environ["OPENAI_API_KEY"])

        results = []
        model_key = f"openai/{model}"
        dims = EMBEDDING_MODELS[model_key]["dimensions"]

        # Batch all child chunks for efficiency (OpenAI supports up to 2048 inputs)
        texts = [c.text for c in chunks]
        t0 = time.time()
        response = client.embeddings.create(model=model, input=texts)
        latency = (time.time() - t0) * 1000

        for i, (chunk, emb_obj) in enumerate(zip(chunks, response.data)):
            tok = approx_token_count(chunk.text)
            results.append(EmbeddingResult(
                chunk_id   = chunk.chunk_id,
                model      = model,
                provider   = "OpenAI",
                dimensions = dims,
                vector     = emb_obj.embedding,
                token_count= tok,
                cost_usd   = estimate_cost(tok, model_key),
                latency_ms = latency / len(chunks),
            ))
        return results

    except Exception as e:
        print(f"  [OpenAI] API call failed: {e}")
        print(f"  [OpenAI] Falling back to simulated vectors for cost/demo display.\n")
        return _simulate_embeddings(chunks, f"openai/{model}", "OpenAI")


# ── 4b. HuggingFace (local) ───────────────────────────────────────────────────
def embed_huggingface(chunks: list[Chunk], model_name: str = "all-MiniLM-L6-v2") -> list[EmbeddingResult]:
    """
    Local HuggingFace embedding via sentence-transformers.
    No API key needed. Runs on CPU or GPU.
    Install: pip install sentence-transformers
    """
    try:
        from sentence_transformers import SentenceTransformer
        model_key = f"huggingface/{model_name}"
        dims = EMBEDDING_MODELS[model_key]["dimensions"]

        print(f"  [HuggingFace] Loading model '{model_name}'...")
        model = SentenceTransformer(model_name)

        texts = [c.text for c in chunks]
        t0 = time.time()
        vectors = model.encode(texts, show_progress_bar=False, normalize_embeddings=True)
        latency = (time.time() - t0) * 1000

        results = []
        for chunk, vec in zip(chunks, vectors):
            tok = approx_token_count(chunk.text)
            results.append(EmbeddingResult(
                chunk_id   = chunk.chunk_id,
                model      = model_name,
                provider   = "HuggingFace",
                dimensions = dims,
                vector     = vec.tolist(),
                token_count= tok,
                cost_usd   = 0.0,           # free local model
                latency_ms = latency / len(chunks),
            ))
        return results

    except Exception as e:
        print(f"  [HuggingFace] Model load failed: {e}")
        print(f"  [HuggingFace] Falling back to simulated vectors.\n")
        return _simulate_embeddings(chunks, f"huggingface/{model_name}", "HuggingFace")


# ── 4c. BGE (local, via sentence-transformers) ────────────────────────────────
def embed_bge(chunks: list[Chunk], model_name: str = "BAAI/bge-large-en-v1.5") -> list[EmbeddingResult]:
    """
    BGE models from BAAI — best open-source embeddings on MTEB benchmark.
    Runs locally via sentence-transformers.
    BGE requires a query prefix for retrieval:
      - Passages (stored): no prefix needed
      - Queries (at search time): prefix with "Represent this sentence: "
    Install: pip install sentence-transformers
    """
    try:
        from sentence_transformers import SentenceTransformer
        short_key = model_name.split("/")[-1].lower().replace("-", "-")
        model_key = f"bge/{short_key}"
        dims = EMBEDDING_MODELS.get(model_key, {}).get("dimensions", 1024)

        print(f"  [BGE] Loading model '{model_name}'...")
        model = SentenceTransformer(model_name)

        # BGE: passages don't need prefix; queries need "Represent this: "
        texts = [c.text for c in chunks]
        t0 = time.time()
        vectors = model.encode(texts, show_progress_bar=False, normalize_embeddings=True)
        latency = (time.time() - t0) * 1000

        results = []
        for chunk, vec in zip(chunks, vectors):
            tok = approx_token_count(chunk.text)
            results.append(EmbeddingResult(
                chunk_id   = chunk.chunk_id,
                model      = model_name,
                provider   = "BGE (BAAI)",
                dimensions = dims,
                vector     = vec.tolist(),
                token_count= tok,
                cost_usd   = 0.0,
                latency_ms = latency / len(chunks),
            ))
        return results

    except Exception as e:
        print(f"  [BGE] Model load failed: {e}")
        print(f"  [BGE] Falling back to simulated vectors.\n")
        return _simulate_embeddings(chunks, model_key if 'model_key' in dir() else "bge/bge-large-en-v1.5", "BGE (BAAI)")


# ── Fallback: Simulate vectors for display when models can't be loaded ─────────
# 🧠 What this function does

# 👉 This function simulates embeddings (fake vectors) instead of calling a real API.

# It is used when:

# ❌ OpenAI API is not available
# ✅ You want to test/demo your pipeline

# 🔄 High-Level Flow
# Chunks → Generate fake vectors → Add metadata → Return results

# 📘 Step-by-Step Explanation

# 🔹 Step 1: Get model info
# info = EMBEDDING_MODELS.get(model_key, {"dimensions": 384, "cost_per_1k_tok": 0.0})
# dims = info["dimensions"]

# 👉 Example:

# model_key = "openai/text-embedding-3-small"
# dims = 1536

# 👉 If model not found:

# Default → 384 dimensions

# 🔹 Step 2: Fix random seed
# np.random.seed(42)

# 👉 This ensures:

# Same input → same output every time

# ✔ Useful for:

# Testing
# Debugging

# 🔹 Step 3: Loop through chunks
# for chunk in chunks:
# Example input:
# chunks = [
#   {"chunk_id": "c1", "text": "AI improves productivity"},
#   {"chunk_id": "c2", "text": "Machine learning enables automation"}
# ]

# 🔹 Step 4: Approximate token count
# tok = approx_token_count(chunk.text)

# 👉 Example:

# "AI improves productivity" → ~5 tokens

# 🔹 Step 5: Create random vector
# vec = np.random.randn(dims)

# 👉 Example (dims = 4 for simplicity):

# [0.2, -1.1, 0.5, 0.9]

# 🔹 Step 6: Normalize vector
# vec = vec / np.linalg.norm(vec)

# 👉 Makes vector length = 1

# Why?

# ✔ Important for:

# Cosine similarity
# Vector search

# 🔹 Step 7: Convert to list
# vec.tolist()

# 👉 So it can be stored in:

# JSON
# Vector DB

# 🔹 Step 8: Estimate cost
# cost_usd = estimate_cost(tok, model_key)

# 👉 Even though vector is fake:

# Cost is calculated correctly

# 🔹 Step 9: Simulate latency
# latency_ms = round(np.random.uniform(2, 8), 2)

# 👉 Example:

# 5.43 ms

# 🔹 Step 10: Create result object
# EmbeddingResult(
#     chunk_id   = chunk.chunk_id,
#     model      = model_key.split("/")[-1],
#     provider   = provider,
#     dimensions = dims,
#     vector     = vec,
#     token_count= tok,
#     cost_usd   = cost,
#     latency_ms = latency
# )

# 📦 Final Output Example
# [
#   {
#     "chunk_id": "c1",
#     "vector": [0.12, -0.45, 0.33, ...],
#     "dimensions": 1536,
#     "token_count": 5,
#     "cost_usd": 0.0000005,
#     "latency_ms": 4.2
#   },
#   {
#     "chunk_id": "c2",
#     "vector": [0.88, 0.21, -0.67, ...],
#     "dimensions": 1536,
#     "token_count": 6,
#     "cost_usd": 0.0000006,
#     "latency_ms": 6.1
#   }
# ]

# 🎯 Why This is Useful
# ✅ 1. No API required
# Works offline
# No cost

# ✅ 2. Pipeline testing

# You can test:

# Chunking → Embedding → Vector DB → Retrieval

# without real embeddings

# ✅ 3. Deterministic output

# Because of:

# np.random.seed(42)

# 👉 Same input → same vectors
# 👉 Easy debugging

# ⚠️ Important Limitation

# ❌ These vectors have:

# NO semantic meaning
# NO real similarity

# 👉 So:

# Not useful for real search
# Only for testing
# 🧠 Simple Analogy

# 👉 Real embedding:

# Meaningful fingerprint of text 🧠

# 👉 Simulated embedding:

# Random fingerprint 🎲

def _simulate_embeddings(chunks: list[Chunk], model_key: str, provider: str) -> list[EmbeddingResult]:
    """Produces deterministic mock vectors — dimensions/costs are 100% accurate."""
    info = EMBEDDING_MODELS.get(model_key, {"dimensions": 384, "cost_per_1k_tok": 0.0})
    dims = info["dimensions"]
    np.random.seed(42)
    results = []
    for chunk in chunks:
        tok = approx_token_count(chunk.text)
        vec = np.random.randn(dims)
        vec = (vec / np.linalg.norm(vec)).tolist()   # unit-normalized
        results.append(EmbeddingResult(
            chunk_id   = chunk.chunk_id,
            model      = model_key.split("/")[-1],
            provider   = provider,
            dimensions = dims,
            vector     = vec,
            token_count= tok,
            cost_usd   = estimate_cost(tok, model_key),
            latency_ms = round(np.random.uniform(2, 8), 2),
        ))
    return results


# ─────────────────────────────────────────────────────────────────────────────
# STEP 5 — Cost report printer
# ─────────────────────────────────────────────────────────────────────────────
def print_cost_report(results: list[EmbeddingResult], model_key: str):
    info       = EMBEDDING_MODELS[model_key]
    total_tok  = sum(r.token_count for r in results)
    total_cost = sum(r.cost_usd    for r in results)
    avg_lat    = sum(r.latency_ms  for r in results) / len(results)

    print(f"\n  ┌─ Cost & Performance Report ─────────────────────────────────┐")
    print(f"  │  Model      : {model_key}")
    print(f"  │  Provider   : {info['provider']}")
    print(f"  │  Dimensions : {info['dimensions']}")
    print(f"  │  Max Tokens : {info['max_tokens']}")
    print(f"  │  Quality    : {info['quality']}")
    print(f"  │  Speed      : {info['speed']}")
    print(f"  │  Best For   : {info['best_for']}")
    print(f"  ├─ This PDF ─────────────────────────────────────────────────────┤")
    print(f"  │  Chunks embedded  : {len(results)}")
    print(f"  │  Total tokens     : {total_tok:,}")
    print(f"  │  Cost (this file) : ${total_cost:.6f} USD")
    if info["cost_per_1k_tok"] > 0:
        cost_1m_docs = total_cost * 1_000_000
        cost_10k     = total_cost * 10_000
        print(f"  │  Cost @ 10K docs  : ${cost_10k:,.2f} USD")
        print(f"  │  Cost @ 1M  docs  : ${cost_1m_docs:,.2f} USD")
    else:
        print(f"  │  Cost @ any scale : $0.00 (runs locally — FREE)")
    print(f"  │  Avg latency/chunk: {avg_lat:.1f} ms")
    print(f"  └────────────────────────────────────────────────────────────────┘")


# 🧠 What this function does

# 👉 It prints a small preview of embedding vectors for debugging/inspection.

# Instead of printing full vectors (which are huge), it shows:

# First few child chunks
# First 6 values of each vector
# Metadata (tokens, cost, dimensions)

# 🔄 High-Level Flow
# Embedding Results → Filter child chunks → Take first N → Print summary

# 📘 Step-by-Step Explanation

# 🔹 Step 1: Print header
# print(f"\n  Sample vectors (first {n} child chunks):")

# 👉 Example:

# Sample vectors (first 2 child chunks):

# 🔹 Step 2: Filter only child chunks
# child_results = [r for r in results if "child" in r.chunk_id][:n]

# 👉 It:

# Picks only chunks with "child" in ID
# Takes first n results
# Example Input:
# results = [
#   {"chunk_id": "parent_0", "vector": [...]},
#   {"chunk_id": "parent_0_child_0", "vector": [...]},
#   {"chunk_id": "parent_0_child_1", "vector": [...]},
#   {"chunk_id": "parent_1_child_0", "vector": [...]}
# ]

# 👉 After filtering:

# [
#   "parent_0_child_0",
#   "parent_0_child_1"
# ]

# 🔹 Step 3: Loop through selected results
# for r in child_results:

# 🔹 Step 4: Take only first 6 vector values
# vec_preview = [round(v, 4) for v in r.vector[:6]]

# 👉 Example:

# Full vector:

# [0.123456, -0.987654, 0.456789, 0.111111, -0.222222, 0.333333, ...]

# 👉 Preview:

# [0.1235, -0.9877, 0.4568, 0.1111, -0.2222, 0.3333]

# 🔹 Step 5: Print chunk details
# print(f"    [{r.chunk_id}]")

# 👉 Example:

# [parent_0_child_0]

# 🔹 Step 6: Print metadata
# print(f"      dims={r.dimensions}  tokens={r.token_count}  cost=${r.cost_usd:.8f}")

# 👉 Example:

# dims=1536  tokens=120  cost=$0.00001200

# 🔹 Step 7: Print vector preview
# print(f"      vector[:6] = {vec_preview} ...")

# 👉 Example:

# vector[:6] = [0.1235, -0.9877, 0.4568, 0.1111, -0.2222, 0.3333] ...

# 📦 Final Output Example
# Sample vectors (first 2 child chunks):

#   [parent_0_child_0]
#     dims=1536  tokens=120  cost=$0.00001200
#     vector[:6] = [0.1235, -0.9877, 0.4568, 0.1111, -0.2222, 0.3333] ...

#   [parent_0_child_1]
#     dims=1536  tokens=110  cost=$0.00001100
#     vector[:6] = [0.5432, 0.1111, -0.2222, 0.9999, -0.8888, 0.7777] ...

# 🎯 Why This is Useful

# ✅ 1. Debugging
# Check if embeddings are generated correctly

# ✅ 2. Avoid huge output
# Full vector = 1000+ numbers ❌
# Preview = manageable ✅

# ✅ 3. Validate pipeline
# Check:
# token count
# cost
# dimensions

def print_vector_sample(results: list[EmbeddingResult], n: int = 2):
    print(f"\n  Sample vectors (first {n} child chunks):")
    child_results = [r for r in results if "child" in r.chunk_id][:n]
    for r in child_results:
        vec_preview = [round(v, 4) for v in r.vector[:6]]
        print(f"    [{r.chunk_id}]")
        print(f"      dims={r.dimensions}  tokens={r.token_count}  "
              f"cost=${r.cost_usd:.8f}")
        print(f"      vector[:6] = {vec_preview} ...")


# ─────────────────────────────────────────────────────────────────────────────
# MAIN
# ─────────────────────────────────────────────────────────────────────────────
def main():
    print("\n" + "="*65)
    print("  RAG EMBEDDING PIPELINE")
    print("  Strategy: Hierarchical Chunking (Parent-Child)")
    print("="*65)

    # ── Load & Chunk ──────────────────────────────────────────────────────────
    print("\n[1] Extracting text from PDF...")
    raw_text = extract_text(PDF_PATH)
    print(f"    Extracted {len(raw_text)} chars / {len(raw_text.split())} words")

    print("\n[2] Applying Hierarchical Chunking...")
    chunks = hierarchical_chunking(raw_text, child_size=200)
    parents  = [c for c in chunks if c.level == "parent"]
    children = [c for c in chunks if c.level == "child"]
    print(f"    Parents : {len(parents)}")
    print(f"    Children: {len(children)}")
    print(f"    Total   : {len(chunks)}")

    # Show chunk tree
    print("\n    Chunk Tree:")
    for p in parents:
        kids = [c for c in children if c.parent_id == p.chunk_id]
        print(f"    ├─ [PARENT] {p.chunk_id} | {p.section} | {p.word_count} words")
        for k in kids:
            print(f"    │    └─ [child] {k.chunk_id} | {k.word_count} words | "
                  f"{k.text[:50].strip()}...")

    # Embed only child chunks (parents stored separately for context retrieval)
    embed_chunks = children

    # ── MODEL COMPARISON TABLE ────────────────────────────────────────────────
    total_words = sum(c.word_count for c in embed_chunks)
    total_tokens = sum(approx_token_count(c.text) for c in embed_chunks)

    print(f"\n[3] Pre-flight Cost Estimation ({len(embed_chunks)} child chunks | ~{total_tokens} tokens)")
    print(f"\n  {'Model':<38} {'Dims':>5}  {'Cost/1K tok':>12}  {'Est. Cost':>12}  {'Type'}")
    print(f"  {'-'*85}")
    for key, info in EMBEDDING_MODELS.items():
        cost = (total_tokens / 1000) * info["cost_per_1k_tok"]
        cost_str = f"${cost:.6f}" if cost > 0 else "FREE"
        type_str = "API (paid)" if info["requires_api"] else "Local (free)"
        print(f"  {key:<38} {info['dimensions']:>5}  "
              f"${info['cost_per_1k_tok']:.5f}/1K  {cost_str:>12}  {type_str}")

    # ── PROVIDER 1: OpenAI ────────────────────────────────────────────────────
    print(f"\n{'='*65}")
    print(f"[4a] OPENAI — text-embedding-3-small")
    print(f"{'='*65}")
    results_oai_small = embed_openai(embed_chunks, model="text-embedding-3-small")
    print_cost_report(results_oai_small, "openai/text-embedding-3-small")
    print_vector_sample(results_oai_small)

    print(f"\n{'='*65}")
    print(f"[4b] OPENAI — text-embedding-3-large")
    print(f"{'='*65}")
    results_oai_large = embed_openai(embed_chunks, model="text-embedding-3-large")
    print_cost_report(results_oai_large, "openai/text-embedding-3-large")
    print_vector_sample(results_oai_large)

    # ── PROVIDER 2: HuggingFace ───────────────────────────────────────────────
    print(f"\n{'='*65}")
    print(f"[5a] HUGGINGFACE — all-MiniLM-L6-v2")
    print(f"{'='*65}")
    results_hf_mini = embed_huggingface(embed_chunks, model_name="all-MiniLM-L6-v2")
    print_cost_report(results_hf_mini, "huggingface/all-MiniLM-L6-v2")
    print_vector_sample(results_hf_mini)

    print(f"\n{'='*65}")
    print(f"[5b] HUGGINGFACE — all-mpnet-base-v2")
    print(f"{'='*65}")
    results_hf_mpnet = embed_huggingface(embed_chunks, model_name="all-mpnet-base-v2")
    print_cost_report(results_hf_mpnet, "huggingface/all-mpnet-base-v2")
    print_vector_sample(results_hf_mpnet)

    # ── PROVIDER 3: BGE ───────────────────────────────────────────────────────
    print(f"\n{'='*65}")
    print(f"[6a] BGE — BAAI/bge-small-en-v1.5")
    print(f"{'='*65}")
    results_bge_small = embed_bge(embed_chunks, model_name="BAAI/bge-small-en-v1.5")
    print_cost_report(results_bge_small, "bge/bge-small-en-v1.5")
    print_vector_sample(results_bge_small)

    print(f"\n{'='*65}")
    print(f"[6b] BGE — BAAI/bge-large-en-v1.5")
    print(f"{'='*65}")
    results_bge_large = embed_bge(embed_chunks, model_name="BAAI/bge-large-en-v1.5")
    print_cost_report(results_bge_large, "bge/bge-large-en-v1.5")
    print_vector_sample(results_bge_large)

    # ── FINAL COMPARISON SUMMARY ──────────────────────────────────────────────
    print(f"\n{'='*65}")
    print(f"  FINAL MODEL COMPARISON SUMMARY")
    print(f"  (for {len(embed_chunks)} child chunks from financial_report_2024.pdf)")
    print(f"{'='*65}")

    all_results = [
        ("OpenAI 3-small",    results_oai_small,  "openai/text-embedding-3-small"),
        ("OpenAI 3-large",    results_oai_large,  "openai/text-embedding-3-large"),
        ("HF MiniLM-L6",      results_hf_mini,    "huggingface/all-MiniLM-L6-v2"),
        ("HF mpnet-base",     results_hf_mpnet,   "huggingface/all-mpnet-base-v2"),
        ("BGE small-v1.5",    results_bge_small,  "bge/bge-small-en-v1.5"),
        ("BGE large-v1.5",    results_bge_large,  "bge/bge-large-en-v1.5"),
    ]

    print(f"\n  {'Model':<20} {'Dims':>5} {'Quality':<10} {'Cost (file)':>14} {'Cost (10K docs)':>16} {'Recommended For'}")
    print(f"  {'-'*90}")
    for label, results, mkey in all_results:
        info    = EMBEDDING_MODELS[mkey]
        cost    = sum(r.cost_usd for r in results)
        cost10k = cost * 10_000
        cost_str   = f"${cost:.6f}" if cost > 0 else "$0.000000"
        cost10k_str= f"${cost10k:,.2f}" if cost10k > 0 else "FREE"
        print(f"  {label:<20} {info['dimensions']:>5} {info['quality']:<10} "
              f"{cost_str:>14} {cost10k_str:>16}  {info['best_for'][:35]}")

    print(f"""
  ┌─ Recommendation Guide ──────────────────────────────────────────┐
  │                                                                  │
  │  🏆 Best Quality (paid)  : OpenAI text-embedding-3-large        │
  │     → 3072 dims, highest MTEB score, ideal for finance/legal    │
  │                                                                  │
  │  💰 Best Value (paid)    : OpenAI text-embedding-3-small        │
  │     → 6.5x cheaper than large, still excellent retrieval        │
  │                                                                  │
  │  🆓 Best Free (quality)  : BGE bge-large-en-v1.5               │
  │     → 1024 dims, beats HF models on all MTEB benchmarks        │
  │     → GPU recommended for production throughput                  │
  │                                                                  │
  │  ⚡ Best Free (speed)    : BGE bge-small-en-v1.5               │
  │     → 384 dims, CPU-friendly, near-large quality at 3x speed    │
  │                                                                  │
  │  📌 Production Pattern   : Hierarchical chunks (this script)    │
  │     → Store child vectors in Chroma/Pinecone/Weaviate           │
  │     → At query time: retrieve child → send parent to LLM        │
  └──────────────────────────────────────────────────────────────────┘
""")


if __name__ == "__main__":
    main()

Output :








Note :
  • Above code have doc strings with proper explanation of both design and business logic
  • Understand models we are using, patterns we are using in design and also python related topics like data classes, regular expressions etc.
  • Code is huge, but if you go through line by line, definition by definition, it will be easy
  • I recommend, instead of understanding logic from python perspective, try to understand what we are going to get from this logic at the end - then it will make more sense.


Vector Store Vs Vector DB

A Vector store or a database is a specialized database that stores embeddings and retrieves semantically similar data using nearest-neighbor search.



We should have a clear idea on when to use Vector store vs Vector DB. 

  • If total no. of vectors < 1000 then we can use Vector store
  • If total no. of vectors > 1000 then go for Vector DB
For production, we have to always opt for Vector DB. We shouldn't use Vector store in production. Please closely observe above 2 images to have more idea.

Vector Store 

  • Stores only vectors
  • Fast similarity search
  • Simple + lightweight
  • Limited functionality 

Vector DB 

  • Stores vectors + metadata + text
    • Search will be fast due to metadata
  • Filtering + Hybrid search
    • Hybrid means Similarity + keyword search
  • Scalable + full featured
  • RAG + applications


Try to understand below example to understand the power of adding meta data into vector DB

We are using a vector DB where we have a facility to store the meta data of the vectors. Assume, we have stored the vectors for 2 different vendors i.e. vendor-1, vendor-2. Now, if we need to search only vendor-2 related vector information in vector DB, we can simply a filter on vendor-2 and search the data related to ONLY vendor-2. This facility won't be available if we use vector store in production. This is the power of adding meta data into Vector DB. 

Hope you are confident now on using Vector store Vs Vector DB.



Note : Carefully observe above images to understand the deciding factors for a Vector DB.



Indexing 

Indexing in a vector DB means organizing embeddings into a search structure so nearest-neighbor retrieval is much faster than scanning every vector

We have 3 types of indexing as below:

  • Flat Indexing
  • IVF - PQ
  • HNSW

I will add indexing information by tomorrow EOD.


Thank you for reading this blog !

Arun Mathe

Comments

Popular posts from this blog

AWS : Working with Lambda, Glue, S3/Redshift

This is one of the important concept where we will see how an end-to-end pipeline will work in AWS. We are going to see how to continuously monitor a common source like S3/Redshift from Lambda(using Boto3 code) and initiate a trigger to start some Glue job(spark code), and perform some action.  Let's assume that, AWS Lambda should initiate a trigger to another AWS service Glue as soon as some file got uploaded in AWS S3 bucket, Lambda should pass this file information as well to Glue, so that Glue job will perform some transformation and upload that transformed data into AWS RDS(MySQL). Understanding above flow chart : Let's assume one of your client is uploading some files(say .csv/.json) in some AWS storage location, for example S3 As soon as this file got uploaded in S3, we need to initiate a TRIGGER in AWS Lambda using Boto3 code Once this trigger is initiated, another AWS service called GLUE(ETL Tool)  will start a Pyspark job to receive this file from Lambda, perform so...

(AI Blog#1) Deep Learning and Neural Networks

I was curious to learn Artificial Intelligence and thinking what is the best place to start learning, and then realized that Deep Learning and Neural Networks is the heart of AI. Hence started diving into AI from this point. Starting from today, I will write continuous blogs on AI, especially Gen AI & Agentic AI. Incase if you are interested on above topics then please watch out this space. What is Artificial Intelligence, Machine Learning & Deep Learning ? AI can be described as the effort to automate intellectual tasks normally performed by Humans. Is this really possible ? For example, when we see an image with our eyes, we will identify it within a fraction of milliseconds. Isn't it ? For a computer, is it possible to do the same within same time limit ? That's the power we are talking about. To be honest, things seems to be far advanced than we actually thing about AI.  BTW, starting from this blog, it is not just a technical journal, we talk about internals here. ...

Spark Core : Understanding RDD & Partitions in Spark

Let us see how to create an RDD in Spark.   RDD (Resilient Distributed Dataset): We can create RDD in 2 ways. From Collections For small amount of data We can't use it for large amount of data From Datasets  For huge amount of data Text, CSV, JSON, PDF, image etc. When data is large we should go with Dataset approach     How to create an RDD ? Using collections val list = List(1, 2, 3, 4, 5, 6) val rdd = sc.parallelize(list) SC is Spark Context parallelize() method will convert input(collection in this case) into RDD Type of RDD will be based on the values assigned to collection, if we assign integers and RDD will be of type int Let's see below Scala code : # Created an RDD by providing a Collection(List) as input scala> val rdd = sc.parallelize(List(1, 2, 3, 4, 5)) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:23 # Printing RDD using collect() method scala> rdd.collect() res0: Array[Int] = Array(1, 2, 3, 4...