Learning Guide -- Advanced¶
CAR-T Intelligence Agent: Deep Internals and Extension Guide¶
Author: Adam Jones Date: March 2026 Codebase Version: 21,259 lines of Python across 61 files Audience: Experienced developers who want to understand the internals and extend the system
Prerequisites¶
Before starting this guide, you should have:
- Completed the Foundations guide -- you understand what the CAR-T Intelligence Agent does, how to run it, and how to issue queries through the UI and API.
- Python proficiency -- you are comfortable with Pydantic v2, asyncio, decorators, abstract base classes, and
concurrent.futures. - Basic ML/NLP concepts -- you know what embeddings are, what cosine similarity measures, and how retrieval-augmented generation works at a high level.
- Vector database basics -- you understand that Milvus stores high-dimensional vectors and retrieves the nearest neighbors for a query vector.
- Development environment -- you have the repo cloned, dependencies installed, and can run
pytest tests/successfully (415 tests, all passing).
Codebase map for reference:
cart_intelligence_agent/
src/ # 12,944 lines -- core engine
rag_engine.py # 754 lines -- multi-collection RAG
collections.py # 1,004 lines -- Milvus schema + manager
models.py # 484 lines -- Pydantic models + enums
knowledge.py # 2,249 lines -- knowledge graph (3 dictionaries)
query_expansion.py # 1,592 lines -- 12 expansion maps
agent.py # 309 lines -- autonomous agent
export.py # 1,487 lines -- Markdown/JSON/PDF export
metrics.py # 404 lines -- Prometheus integration
scheduler.py # 226 lines -- APScheduler weekly refresh
ingest/ # ~4,400 lines -- 15 ingest parsers
utils/ # pubmed_client.py
app/ # 1,162 lines -- Streamlit UI
cart_ui.py
api/ # 1,033 lines -- FastAPI REST API
main.py
config/
settings.py # 113 lines -- Pydantic BaseSettings
tests/ # 4,321 lines -- 415 tests
scripts/ # 1,686 lines -- seed + setup utilities
Chapter 1: Deep Dive into the RAG Engine¶
The RAG engine (src/rag_engine.py, 754 lines) is the central nervous system of the agent. Every query -- whether from the Streamlit UI, the FastAPI endpoint, or the autonomous agent -- flows through CARTRAGEngine.
1.1 The CARTRAGEngine Class¶
class CARTRAGEngine:
def __init__(self, collection_manager, embedder, llm_client,
knowledge=None, query_expander=None):
self.collections = collection_manager
self.embedder = embedder
self.llm = llm_client
self.knowledge = knowledge
self.expander = query_expander
Five dependencies are injected at construction time. This is a deliberate design choice: every external service (Milvus, the embedding model, the LLM, the knowledge graph, and the query expansion module) is injected rather than imported directly. This makes the engine fully testable with mocks (see Chapter 9).
1.2 The retrieve() Method -- Line by Line¶
retrieve() is the most important method in the entire codebase. Here is the complete execution flow:
Step 1: Prepare the search text.
top_k = top_k_per_collection or settings.TOP_K_PER_COLLECTION # default: 5
start = time.time()
search_text = query.question
if conversation_context:
search_text = f"{conversation_context}\n\nCurrent question: {query.question}"
When conversation memory is active (the UI tracks the last MAX_CONVERSATION_CONTEXT=3 exchanges), the prior context is prepended to the current question. This changes the embedding to account for conversational continuity -- a follow-up question like "What about its toxicity?" embeds differently when the prior context mentions "Yescarta."
Step 2: Embed the query.
This calls _embed_query(), which prepends the BGE instruction prefix:
def _embed_query(self, text: str):
prefix = "Represent this sentence for searching relevant passages: "
return self.embedder.embed_text(prefix + text)
The prefix is critical. BGE-small-en-v1.5 is an asymmetric embedding model trained with instruction-prefix pairs. Without the prefix, retrieval quality drops measurably (typically 5-10% lower recall). Documents are embedded without any prefix -- only queries get the prefix.
Step 3: Build per-collection filter expressions.
filter_exprs = {}
for coll in collections_to_search:
parts = []
cfg = COLLECTION_CONFIG.get(coll, {})
if query.target_antigen and cfg.get("has_target_antigen"):
parts.append(f'target_antigen == "{query.target_antigen}"')
year_field = cfg.get("year_field")
if year_field:
if year_min:
parts.append(f'{year_field} >= {year_min}')
if year_max:
parts.append(f'{year_field} <= {year_max}')
if parts:
filter_exprs[coll] = " and ".join(parts)
This builds Milvus boolean expressions per collection. Not every collection has a target_antigen field (e.g., cart_manufacturing and cart_safety do not -- see has_target_antigen in COLLECTION_CONFIG). Not every collection has a year_field (only cart_literature has year and cart_trials has start_year). The filter expressions are passed to Milvus so it applies them during the ANN search, not after.
Step 4: Parallel search across all collections.
all_hits = self._search_all_collections(
query_embedding, collections_to_search, top_k, filter_exprs,
)
Inside _search_all_collections(), the engine calls self.collections.search_all(), which uses ThreadPoolExecutor with max_workers=len(collections) (11 threads) to search all collections simultaneously. Each thread calls collection.search() on Milvus.
Step 5: Query expansion.
if self.expander:
expanded_hits = self._expanded_search(
query.question, query_embedding, collections_to_search, top_k,
)
all_hits.extend(expanded_hits)
The expansion system (Chapter 6) generates additional search terms. The key insight in _expanded_search() is the bifurcated strategy:
- If an expansion term is a known target antigen (checked against
_KNOWN_ANTIGENS), it is used as a Milvus field filter:target_antigen == "CD19". This is a precise, metadata-driven search. - If the expansion term is NOT an antigen (e.g., "cytokine release syndrome"), it is re-embedded and used for a semantic search across all collections. Expansion hits receive a score penalty (0.7x multiplier for semantic, 0.8x for antigen-filtered) to prevent expansion results from overwhelming direct hits.
Step 6: Merge, deduplicate, and rank.
def _merge_and_rank(self, hits: List[SearchHit]) -> List[SearchHit]:
seen = set()
unique = []
for hit in hits:
if hit.id not in seen:
seen.add(hit.id)
unique.append(hit)
unique.sort(key=lambda h: h.score, reverse=True)
return unique[:30]
Dedup is by record ID (hit.id). The cap of 30 is a hard limit to prevent prompt bloat -- the LLM context window should not be overwhelmed with low-relevance evidence.
Step 7: Knowledge graph augmentation.
knowledge_context = ""
if self.knowledge:
knowledge_context = self._get_knowledge_context(query.question)
_get_knowledge_context() is a keyword-matching router that scans the query for mentions of target antigens, toxicities, manufacturing processes, biomarkers, regulatory products, and immunogenicity topics. It calls the appropriate get_*_context() function from knowledge.py for each match. The knowledge context is injected into the LLM prompt alongside the vector search evidence.
1.3 Score Weighting Math¶
Inside _search_all_collections(), each raw Milvus cosine similarity score is adjusted:
Where weight comes from COLLECTION_CONFIG (sourced from settings.py). The default weights are:
| Collection | Weight | Effective Multiplier |
|---|---|---|
| Literature | 0.20 | 1.20 |
| Trials | 0.16 | 1.16 |
| Constructs | 0.10 | 1.10 |
| Assays | 0.09 | 1.09 |
| Safety | 0.08 | 1.08 |
| Biomarkers | 0.08 | 1.08 |
| Manufacturing | 0.07 | 1.07 |
| RealWorld | 0.07 | 1.07 |
| Regulatory | 0.06 | 1.06 |
| Sequences | 0.06 | 1.06 |
| Genomic | 0.04 | 1.04 |
This means a literature hit with a raw score of 0.80 becomes 0.80 * 1.20 = 0.96, while a genomic hit with the same raw score becomes 0.80 * 1.04 = 0.832. Literature and trials are intentionally boosted because they contain the most dense, curated information.
1.4 Citation Relevance Scoring¶
Each hit receives a relevance tier:
if raw_score >= settings.CITATION_HIGH_THRESHOLD: # 0.75
relevance = "high"
elif raw_score >= settings.CITATION_MEDIUM_THRESHOLD: # 0.60
relevance = "medium"
else:
relevance = "low"
These tiers are surfaced in the prompt as [high relevance], [medium relevance], or [low relevance] tags, and the prompt instructs the LLM to "prioritize [high relevance] citations."
1.5 The CART_SYSTEM_PROMPT¶
The system prompt covers 12 CAR-T domains and establishes the agent's behavioral rules:
CART_SYSTEM_PROMPT = """You are a CAR-T cell therapy intelligence agent with deep expertise in:
1. **Target Identification** — antigen biology, expression profiling, tumor specificity
2. **CAR Design** — scFv selection, costimulatory domains (CD28 vs 4-1BB), signaling architecture
3. **Vector Engineering** — lentiviral/retroviral production, transduction efficiency, VCN optimization
4. **In Vitro & In Vivo Testing** — cytotoxicity assays, cytokine profiling, mouse models, persistence
5. **Clinical Development** — trial design, response rates, toxicity management (CRS, ICANS)
6. **Manufacturing** — leukapheresis, T-cell expansion, cryopreservation, release testing, CMC
7. **Safety & Pharmacovigilance** — post-market safety signals, REMS, long-term follow-up, FAERS
8. **Biomarkers** — CRS prediction (ferritin, CRP, IL-6), response biomarkers, MRD monitoring
9. **Regulatory Intelligence** — FDA approval pathways, BLA timelines, breakthrough therapy, RMAT, EMA
10. **Molecular Design** — scFv binding affinity, CDR sequences, humanization, nanobodies
11. **Real-World Evidence** — registry outcomes (CIBMTR), community vs academic, special populations
12. **Genomic Evidence** — patient variant data, ClinVar, AlphaMissense pathogenicity
...
The prompt explicitly instructs the LLM to cite evidence using clickable markdown links, think cross-functionally, highlight failure modes, and acknowledge uncertainty. This is the personality of the agent.
1.6 Prompt Construction¶
_build_prompt() assembles the final prompt from three layers:
- Evidence sections -- grouped by collection, with citation links and relevance tags
- Knowledge graph context -- injected only if
_get_knowledge_context()returned non-empty text - Instructions -- the question plus grounding instructions
return (
f"## Retrieved Evidence\n\n"
f"{evidence_text}"
f"{knowledge_text}\n\n"
f"---\n\n"
f"## Question\n\n"
f"{question}\n\n"
f"Please provide a comprehensive answer grounded in the evidence above. "
f"Cite sources using the clickable markdown links provided in each evidence item. "
f"Prioritize [high relevance] citations. "
f"Consider cross-functional insights across all stages of CAR-T development."
)
Each evidence item is formatted as:
1. [Literature:PMID 12345678](https://pubmed.ncbi.nlm.nih.gov/12345678/) [high relevance] (score=0.923) CD19 CAR-T therapy achieves...
The _format_citation() static method generates clickable PubMed URLs for literature hits (when the ID is numeric) and ClinicalTrials.gov URLs for trial hits (when the ID starts with "NCT"). All other collections use the [Collection:ID] format.
Chapter 2: Vector Search Internals¶
2.1 How IVF_FLAT Works¶
The agent uses IVF_FLAT (Inverted File with Flat quantization) as the Milvus index type. Here is what happens at each stage:
Index building (at collection creation time):
IVF_FLAT partitions the vector space into nlist=1024 Voronoi cells using k-means clustering. Each vector is assigned to the cell whose centroid is closest. The "FLAT" part means vectors within each cell are stored without compression -- no quantization loss.
Search (at query time):
At query time, the system finds the nprobe=16 closest cluster centroids to the query vector, then performs brute-force comparison against all vectors within those 16 cells. This means the search examines roughly 16/1024 = 1.56% of the total vectors, providing a massive speedup over brute-force while maintaining high recall.
2.2 Why COSINE over L2 or IP¶
Three common distance metrics for vector search:
| Metric | Formula | Range | Use Case |
|---|---|---|---|
| COSINE | 1 - cos(a,b) | [0, 2] | Normalized similarity, direction matters |
| L2 (Euclidean) | sqrt(sum((a-b)^2)) | [0, inf) | Magnitude-sensitive |
| IP (Inner Product) | sum(a*b) | (-inf, inf) | Pre-normalized vectors |
COSINE was chosen because BGE-small-en-v1.5 embeddings are not pre-normalized. Cosine similarity measures the angle between vectors, ignoring magnitude. This is important for text embeddings where the magnitude can vary based on text length -- a short query should still match a long document chunk. The returned score ranges from 0 (orthogonal, unrelated) to 1 (identical direction, highly similar).
2.3 Why nprobe=16¶
The nprobe parameter controls the recall-latency tradeoff:
- nprobe=1: Search only the single closest cluster. Very fast (~0.5ms per collection), but recall drops to ~60-70%.
- nprobe=16: Search 16 clusters. Moderate latency (~2-5ms per collection), recall ~95-98%.
- nprobe=1024: Search all clusters (equivalent to brute force). Recall is 100%, but latency scales linearly with data size.
With 11 collections being searched in parallel, the per-collection latency of 2-5ms means the total search wall-clock time is typically 5-15ms (limited by the slowest collection). The nprobe=16 value was chosen empirically to keep P99 latency under 50ms while maintaining recall above 95%.
2.4 The BGE Embedding Prefix Trick¶
BGE-small-en-v1.5 is an asymmetric bi-encoder model. It was trained with instruction prefixes for queries but NOT for documents:
# Query embedding (with prefix)
def _embed_query(self, text: str):
prefix = "Represent this sentence for searching relevant passages: "
return self.embedder.embed_text(prefix + text)
# Document embedding (no prefix -- done in ingest pipelines)
texts = [record.to_embedding_text() for record in batch]
embeddings = self.embedder.encode(texts) # No prefix
The prefix shifts the query embedding into a region of the vector space that better aligns with relevant document passages. Without the prefix, the model treats the input as a document rather than a query, reducing the quality of retrieval.
2.5 How 384 Dimensions Capture Semantics¶
BGE-small-en-v1.5 maps text into a 384-dimensional vector space. Each dimension encodes a learned semantic feature. The model was trained on 1 billion+ text pairs, learning to place semantically similar texts close together in this space.
For this application: - "CRS management" and "cytokine release syndrome treatment" have cosine similarity ~0.87 - "CRS management" and "lentiviral transduction efficiency" have cosine similarity ~0.15 - "CD19 CAR-T efficacy in B-ALL" and "tisagenlecleucel response rate in pediatric leukemia" have cosine similarity ~0.82
The 384-dimensional space is sufficient for biomedical text. Larger models (768-dim, 1024-dim) provide marginal improvements but significantly increase storage and latency. At 384 dimensions with float32, each vector consumes 1,536 bytes -- for a collection with 10,000 records, the index is roughly 15MB.
Chapter 3: Adding a New Collection¶
This chapter walks through a complete worked example: adding a hypothetical cart_imaging collection for storing CAR-T cell imaging data (PET-CT, MRI, flow imaging).
Step 1: Define the Schema in collections.py¶
Open src/collections.py and add a new schema definition alongside the existing ones:
# ── cart_imaging ──────────────────────────────────────────────────────
IMAGING_FIELDS = [
FieldSchema(
name="id",
dtype=DataType.VARCHAR,
is_primary=True,
max_length=100,
description="Imaging record identifier",
),
FieldSchema(
name="embedding",
dtype=DataType.FLOAT_VECTOR,
dim=EMBEDDING_DIM,
description="BGE-small-en-v1.5 text embedding",
),
FieldSchema(
name="text_summary",
dtype=DataType.VARCHAR,
max_length=3000,
description="Description of imaging finding for embedding",
),
FieldSchema(
name="modality",
dtype=DataType.VARCHAR,
max_length=30,
description="PET-CT, MRI, flow_imaging, bioluminescence",
),
FieldSchema(
name="target_antigen",
dtype=DataType.VARCHAR,
max_length=100,
description="Target antigen of the CAR-T product imaged",
),
FieldSchema(
name="timepoint",
dtype=DataType.VARCHAR,
max_length=50,
description="e.g., day 7, day 28, month 3",
),
FieldSchema(
name="finding",
dtype=DataType.VARCHAR,
max_length=500,
description="Key imaging finding",
),
FieldSchema(
name="product",
dtype=DataType.VARCHAR,
max_length=200,
description="CAR-T product name",
),
FieldSchema(
name="patient_id",
dtype=DataType.VARCHAR,
max_length=50,
description="Anonymized patient identifier",
),
]
IMAGING_SCHEMA = CollectionSchema(
fields=IMAGING_FIELDS,
description="CAR-T cell imaging and biodistribution data",
)
Then register it in COLLECTION_SCHEMAS:
COLLECTION_SCHEMAS: Dict[str, CollectionSchema] = {
# ... existing 11 entries ...
"cart_imaging": IMAGING_SCHEMA,
}
Step 2: Create the Pydantic Model in models.py¶
Add a new enum for imaging modality and a new model class:
class ImagingModality(str, Enum):
PET_CT = "pet_ct"
MRI = "mri"
FLOW_IMAGING = "flow_imaging"
BIOLUMINESCENCE = "bioluminescence"
SPECT = "spect"
class ImagingRecord(BaseModel):
"""CAR-T cell imaging record -- maps to cart_imaging collection."""
id: str = Field(..., max_length=100)
text_summary: str = Field(..., max_length=3000)
modality: ImagingModality = ImagingModality.PET_CT
target_antigen: str = Field("", max_length=100)
timepoint: str = Field("", max_length=50)
finding: str = Field("", max_length=500)
product: str = Field("", max_length=200)
patient_id: str = Field("", max_length=50)
def to_embedding_text(self) -> str:
parts = [self.text_summary]
if self.modality:
parts.append(f"Modality: {self.modality.value}")
if self.finding:
parts.append(f"Finding: {self.finding}")
if self.product:
parts.append(f"Product: {self.product}")
return " ".join(parts)
Step 3: Register the Model in collections.py¶
Add the import and the mapping:
from src.models import (
# ... existing imports ...
ImagingRecord,
)
COLLECTION_MODELS: Dict[str, type] = {
# ... existing 11 entries ...
"cart_imaging": ImagingRecord,
}
Step 4: Add to COLLECTION_CONFIG in rag_engine.py¶
COLLECTION_CONFIG = {
# ... existing 11 entries ...
"cart_imaging": {
"weight": settings.WEIGHT_IMAGING,
"label": "Imaging",
"has_target_antigen": True,
"year_field": None,
},
}
Step 5: Add the Weight to settings.py¶
Adjust the other weights so they still sum to approximately 1.0.
Step 6: Create the Ingest Parser¶
Create src/ingest/imaging_parser.py:
"""Imaging data ingest pipeline for CAR-T Intelligence Agent."""
from typing import Any, List
from loguru import logger
from pydantic import BaseModel
from src.collections import CARTCollectionManager
from src.models import ImagingRecord
from .base import BaseIngestPipeline
class ImagingIngestPipeline(BaseIngestPipeline):
"""Ingest CAR-T imaging data from structured files or APIs."""
DEFAULT_COLLECTION = "cart_imaging"
def fetch(self, **kwargs) -> Any:
"""Fetch imaging data from the configured source."""
# Implement your data fetching logic here
# Could read from CSV, a PACS API, or a FHIR server
raise NotImplementedError("Implement fetch() for your data source")
def parse(self, raw_data: Any) -> List[BaseModel]:
"""Parse raw imaging data into ImagingRecord instances."""
records = []
for item in raw_data:
try:
record = ImagingRecord(
id=item["id"],
text_summary=item["description"],
modality=item.get("modality", "pet_ct"),
target_antigen=item.get("target", ""),
timepoint=item.get("timepoint", ""),
finding=item.get("finding", ""),
product=item.get("product", ""),
patient_id=item.get("patient_id", ""),
)
records.append(record)
except Exception as e:
logger.warning(f"Failed to parse imaging record: {e}")
return records
def run(self, collection_name=None, batch_size=32, **kwargs) -> int:
collection_name = collection_name or self.DEFAULT_COLLECTION
return super().run(collection_name, batch_size, **kwargs)
Step 7: Add Export Format to export.py¶
In the _format_evidence_table() function, add a new branch:
elif collection_name == "Imaging":
lines.append("| # | ID | Score | Modality | Product | Timepoint | Finding |")
lines.append("|---|-----|-------|----------|---------|-----------|---------|")
for i, hit in enumerate(hits[:10], 1):
m = hit.metadata
modality = m.get("modality", "")
product = m.get("product", "")[:20]
timepoint = m.get("timepoint", "")
finding = m.get("finding", "")[:40]
lines.append(f"| {i} | {hit.id} | {hit.score:.3f} | {modality} | {product} | {timepoint} | {finding} |")
Do the same in _build_pdf_evidence_table() for the PDF export.
Step 8: Add UI Toggle and Badge in cart_ui.py¶
In the sidebar collection filter section, add "cart_imaging" to the list of selectable collections. In the evidence display section, add appropriate badge styling for "Imaging" results.
Step 9: Add Test Fixtures¶
In tests/conftest.py, update mock_collection_manager:
Add a sample imaging hit to sample_search_hits:
SearchHit(
collection="Imaging",
id="img-pet-001",
score=0.76,
text="PET-CT at day 28 shows CAR-T cell trafficking to tumor site.",
metadata={"modality": "pet_ct", "product": "Yescarta", "timepoint": "day 28"},
),
Step 10: Run Tests¶
cd /home/adam/projects/hcls-ai-factory/ai_agent_adds/cart_intelligence_agent
python -m pytest tests/ -v
Verify all 415 existing tests still pass, and write new tests for the imaging model, parser, and export format.
Chapter 4: Building a Custom Ingest Pipeline¶
4.1 The BaseIngestPipeline ABC¶
All ingest pipelines inherit from BaseIngestPipeline in src/ingest/base.py. The contract is:
class BaseIngestPipeline(ABC):
def __init__(self, collection_manager, embedder):
self.collection_manager = collection_manager
self.embedder = embedder
@abstractmethod
def fetch(self, **kwargs) -> Any:
"""Retrieve raw data from the upstream source."""
...
@abstractmethod
def parse(self, raw_data: Any) -> List[BaseModel]:
"""Convert raw data into validated Pydantic model instances."""
...
def embed_and_store(self, records, collection_name, batch_size=32) -> int:
"""Embed and insert into Milvus (provided by base class)."""
...
def run(self, collection_name=None, batch_size=32, **fetch_kwargs) -> int:
"""Orchestrate: fetch -> parse -> embed_and_store."""
...
You implement fetch() and parse(). The base class provides embed_and_store() and run().
4.2 The fetch() -> parse() -> embed_and_store() Pattern¶
The pipeline runs in three strict phases:
-
fetch() -- I/O phase. Makes API calls, reads files, or queries databases. Returns raw data in whatever format the source provides (XML, JSON, CSV rows, etc.).
-
parse() -- Validation phase. Converts raw data into typed Pydantic model instances. This is where you extract fields, classify records, and enforce data quality. Returns a
List[BaseModel]. -
embed_and_store() -- Embedding + storage phase. Provided by the base class. For each record, it:
- Calls
record.to_embedding_text()to get the text to embed - Batches texts (default batch_size=32) and calls
self.embedder.encode(texts) - Converts Pydantic models to dicts via
.model_dump() - Converts Enum values to their string
.value(Milvus requires plain strings) - Truncates UTF-8 strings to respect VARCHAR byte limits
- Inserts into Milvus via
self.collection_manager.insert_batch()
4.3 Batch Embedding¶
The base class processes records in batches of 32 by default:
for i in range(0, len(records), batch_size):
batch = records[i : i + batch_size]
texts = [record.to_embedding_text() for record in batch]
embeddings = self.embedder.encode(texts)
# ... insert batch
Batching is important for GPU utilization. BGE-small-en-v1.5 on a DGX Spark can embed ~500 texts/second in batches of 32, but only ~50 texts/second one at a time.
4.4 VARCHAR Truncation¶
Milvus enforces strict byte limits on VARCHAR fields. The base class handles this:
for key, value in record_dict.items():
if isinstance(value, Enum):
record_dict[key] = value.value
elif isinstance(value, str):
encoded = value.encode("utf-8")
if len(encoded) > 2990 and key in ("text_chunk", "text_summary"):
record_dict[key] = encoded[:2990].decode("utf-8", errors="ignore")
elif len(encoded) > 490 and key in ("title", "name", "known_toxicities"):
record_dict[key] = encoded[:490].decode("utf-8", errors="ignore")
The limits are 2990 bytes (not 3000) and 490 bytes (not 500) to leave a small buffer. UTF-8 characters can be up to 4 bytes, so truncating at byte boundaries with errors="ignore" safely drops any incomplete multi-byte characters at the boundary.
4.5 Worked Example: Building a FAERS Ingest Pipeline¶
Here is how the FAERS (FDA Adverse Event Reporting System) pipeline would be structured:
class FAERSIngestPipeline(BaseIngestPipeline):
DEFAULT_COLLECTION = "cart_safety"
def fetch(self, product_names=None, start_date=None, **kwargs):
"""Fetch adverse event reports from FAERS API."""
# Build API query for CAR-T products
products = product_names or [
"KYMRIAH", "YESCARTA", "TECARTUS",
"BREYANZI", "ABECMA", "CARVYKTI"
]
results = []
for product in products:
response = requests.get(
f"https://api.fda.gov/drug/event.json",
params={
"search": f'patient.drug.openfda.brand_name:"{product}"',
"limit": 100,
}
)
if response.ok:
results.extend(response.json().get("results", []))
return results
def parse(self, raw_data):
"""Convert FAERS JSON into SafetyRecord instances."""
records = []
for event in raw_data:
record = SafetyRecord(
id=f"faers-{event['safetyreportid']}",
text_summary=self._build_summary(event),
product=self._extract_product(event),
event_type=self._classify_event(event),
severity_grade=event.get("serious", "unknown"),
reporting_source="FAERS",
year=int(event.get("receiptdate", "0000")[:4]) or 0,
)
records.append(record)
return records
def run(self, collection_name=None, batch_size=32, **kwargs):
collection_name = collection_name or self.DEFAULT_COLLECTION
return super().run(collection_name, batch_size, **kwargs)
4.6 Error Handling¶
The base class wraps each batch insertion in a try/except and continues to the next batch on failure:
except Exception as exc:
logger.error(
f"Failed batch {i // batch_size + 1} "
f"({i}-{i + len(batch)}) into '{collection_name}': {exc}"
)
continue
This means a single corrupt record in a batch will fail that entire batch but not the rest of the pipeline. If you need per-record error isolation, validate records in parse() before returning them.
Chapter 5: Extending the Knowledge Graph¶
The knowledge graph (src/knowledge.py, 2,249 lines) contains three curated dictionaries that provide structured, authoritative data to augment vector search results.
5.1 The Three Knowledge Dictionaries¶
| Dictionary | Entries | Key Data |
|---|---|---|
CART_TARGETS |
25 | Protein, UniProt ID, expression, diseases, approved products, resistance, toxicity |
CART_TOXICITIES |
8 | Mechanism, grading, incidence, timing, management, biomarkers, risk factors |
CART_MANUFACTURING |
10 | Process description, parameters, failure modes, release criteria |
CART_BIOMARKERS |
15 | Assay method, clinical cutoff, predictive value, evidence level |
CART_REGULATORY |
6 | Approval dates, pivotal trials, designations, REMS, subsequent approvals |
CART_IMMUNOGENICITY |
6 | ADA incidence, humanization strategies, HLA epitopes, testing paradigms |
5.2 How to Add a New Knowledge Dictionary¶
Suppose you want to add a dictionary for manufacturing equipment/platforms. Follow this pattern:
Step 1: Define the dictionary.
# In src/knowledge.py
CART_PLATFORMS: Dict[str, Dict[str, Any]] = {
"clinimacs_prodigy": {
"manufacturer": "Miltenyi Biotec",
"type": "Closed automated system",
"capabilities": [
"T-cell activation", "Transduction", "Expansion",
"Wash", "Formulation", "Sampling"
],
"max_volume": "600 mL",
"typical_batch_size": "1e7 to 1e10 cells",
"regulatory_status": "GMP-qualified, CE-marked",
"products_using": ["Point-of-care CAR-T programs"],
"advantages": [
"Fully automated", "Closed system reduces contamination",
"Single-use tubing set", "Reproducible process"
],
"limitations": [
"Single batch at a time", "Limited scalability",
"Operator training required", "High consumable cost"
],
},
# ... more entries ...
}
Step 2: Write the get_*_context() function.
def get_platform_context(platform: str) -> str:
"""Return formatted knowledge for a manufacturing platform."""
key = platform.lower().replace(" ", "_").replace("-", "_")
data = CART_PLATFORMS.get(key)
if not data:
for k in CART_PLATFORMS:
if key in k.lower():
data = CART_PLATFORMS[k]
break
if not data:
return ""
lines = [f"## Platform: {key.replace('_', ' ').title()}"]
lines.append(f"- **Manufacturer:** {data['manufacturer']}")
lines.append(f"- **Type:** {data['type']}")
if data.get("capabilities"):
lines.append(f"- **Capabilities:** {', '.join(data['capabilities'])}")
if data.get("advantages"):
lines.append("- **Advantages:**")
for a in data["advantages"]:
lines.append(f" - {a}")
return "\n".join(lines)
Step 3: Register in get_all_context_for_query().
Add a new keyword-matching block:
# Check platforms
platform_keywords = {
"clinimacs_prodigy": ["PRODIGY", "CLINIMACS"],
"lonza_cocoon": ["COCOON", "LONZA"],
# ...
}
for plat_id, keywords in platform_keywords.items():
if any(kw in query_upper for kw in keywords):
ctx = get_platform_context(plat_id)
if ctx:
sections.append(ctx)
Step 4: Add entity aliases.
In ENTITY_ALIASES:
"PRODIGY": {"type": "platform", "canonical": "clinimacs_prodigy"},
"CLINIMACS": {"type": "platform", "canonical": "clinimacs_prodigy"},
Step 5: Update get_knowledge_stats().
def get_knowledge_stats() -> Dict[str, int]:
return {
# ... existing entries ...
"manufacturing_platforms": len(CART_PLATFORMS),
}
5.3 Keyword Routing¶
The get_all_context_for_query() function uses a simple but effective routing strategy: uppercase string matching against keyword lists. Each knowledge domain has its own block with domain-specific keywords. The function returns early per-domain (using break for manufacturing and biomarkers) to avoid injecting too much context from a single domain.
This design is intentionally simple. A future improvement would use the embedding model to classify which knowledge domains are relevant, but keyword matching is fast (sub-millisecond) and reliable for known terms.
5.4 Entity Resolution for Comparisons¶
The resolve_comparison_entity() function resolves free-text entity names to structured knowledge entries. It checks five sources in priority order:
- Target antigens (exact match against
CART_TARGETSkeys) - Product/alias table (
ENTITY_ALIASES-- 54 entries) - Toxicities (exact match against
CART_TOXICITIESkeys) - Manufacturing processes (fuzzy substring match)
- Biomarkers (case-insensitive match)
Chapter 6: Query Expansion Engineering¶
The query expansion system (src/query_expansion.py, 1,592 lines) contains 12 expansion maps with hundreds of keyword-to-term mappings.
6.1 How Expansion Works¶
def expand_query(query: str) -> List[str]:
query_lower = query.lower()
matched_terms: Set[str] = set()
for category, mapping in ALL_EXPANSION_MAPS:
for keyword, terms in mapping.items():
if keyword in query_lower:
matched_terms.update(terms)
return sorted(matched_terms)
The function scans the lowercased query for every keyword in every map. When a keyword matches, all of its associated terms are added to a set (automatic deduplication).
6.2 The 12 Expansion Maps¶
ALL_EXPANSION_MAPS: List[tuple] = [
("Target Antigen", TARGET_ANTIGEN_EXPANSION), # 27 entries
("Disease", DISEASE_EXPANSION), # 16 entries
("Toxicity", TOXICITY_EXPANSION), # 12 entries
("Manufacturing", MANUFACTURING_EXPANSION), # 14 entries
("Mechanism", MECHANISM_EXPANSION), # 14 entries
("Construct", CONSTRUCT_EXPANSION), # 18 entries
("Safety", SAFETY_EXPANSION), # 8 entries
("Biomarker", BIOMARKER_EXPANSION), # 13 entries
("Regulatory", REGULATORY_EXPANSION), # 8 entries
("Sequence", SEQUENCE_EXPANSION), # 8 entries
("RealWorld", REALWORLD_EXPANSION), # 10 entries
("Immunogenicity", IMMUNOGENICITY_EXPANSION), # 12 entries
]
6.3 Writing Effective Expansion Maps¶
Each map entry is a keyword and a list of semantically related terms:
"crs": [
"CRS", "cytokine release syndrome", "cytokine storm",
"tocilizumab", "Actemra", "siltuximab",
"IL-6", "IL-6 receptor", "sIL-6R",
"ferritin", "CRP", "C-reactive protein",
"fever", "hypotension", "hypoxia",
"Lee grading", "ASTCT grading",
"grade 3 CRS", "grade 4 CRS",
"vasopressors", "dexamethasone",
],
Keyword selection strategy:
- Use lowercase, short, unambiguous keywords. "crs" is better than "cytokine release syndrome" because it matches more user inputs.
- Avoid keywords that are common English words. "expansion" as a keyword matches both "T-cell expansion" (manufacturing) and "expansion of clinical trials" (unrelated). If ambiguity is unavoidable, rely on the scoring system to deprioritize irrelevant results.
- Include product names as keywords in disease maps. Users asking about "multiple myeloma" need to find BCMA-targeted products.
Term list design:
- Include the canonical term first (e.g., "CRS" for the "crs" keyword).
- Include synonyms and abbreviations ("cytokine release syndrome", "cytokine storm").
- Include related treatment names ("tocilizumab", "dexamethasone").
- Include biomarkers ("ferritin", "CRP", "IL-6").
- Include grading systems ("Lee grading", "ASTCT grading").
- Keep lists to 10-25 terms. Larger lists increase search breadth but dilute precision.
6.4 Avoiding Expansion Explosion¶
The RAG engine caps expansion to the first 5 expansion terms:
Without this cap, a query about "CD19 CAR-T CRS" could expand into 60+ terms, each generating 2+ additional hits across 11 collections -- leading to hundreds of low-relevance results drowning out the primary search.
Expansion hits are also score-penalized:
- Antigen field-filter expansion hits:
score * 0.8 - Semantic re-embedding expansion hits:
score * 0.7
This ensures expansion results appear after direct hits in the ranked list.
6.5 Category-Aware Expansion¶
The expand_query_by_category() function returns terms grouped by their source map:
>>> expand_query_by_category("transduction efficiency for CD19 CAR")
{
'Target Antigen': ['CD19', 'B-ALL', 'DLBCL', ...],
'Manufacturing': ['transduction efficiency', 'lentiviral vector', ...],
}
This is useful for weighted category-specific searches where manufacturing terms should receive higher weight when searching cart_manufacturing.
6.6 Testing Expansion¶
Use get_expansion_stats() to verify your maps are registered:
>>> from src.query_expansion import get_expansion_stats
>>> stats = get_expansion_stats()
>>> for category, data in stats.items():
... print(f"{category}: {data['keywords']} keywords, {data['total_terms']} total terms")
Target Antigen: 27 keywords, 398 total terms
Disease: 16 keywords, 224 total terms
...
Chapter 7: The Comparative Analysis System¶
7.1 How retrieve_comparative() Works¶
When a user asks "Compare Yescarta vs Kymriah for DLBCL," the system detects this as a comparative query and triggers retrieve_comparative().
Detection:
def _is_comparative(self, question: str) -> bool:
q_upper = question.upper()
return ("COMPARE" in q_upper or " VS " in q_upper
or "VERSUS" in q_upper or "COMPARING" in q_upper)
Entity parsing:
def _parse_comparison_entities(self, question: str):
# Try "X vs Y" pattern first
match = re.search(
r'(.+?)\s+(?:vs\.?|versus)\s+(.+)$',
q, re.IGNORECASE,
)
# Fallback: "compare X and/with Y"
if not match:
match = re.search(
r'(?:compare|comparing)\s+(.+?)\s+(?:and|with)\s+(.+?)(?:\s+(?:for|in)\b.*)?$',
q, re.IGNORECASE,
)
The parser strips trailing qualifiers ("costimulatory domains", "for DLBCL", "efficacy") and resolves each entity through resolve_comparison_entity() in the knowledge graph.
Dual retrieval:
evidence_a = self.retrieve(query_a, ...) # Yescarta-focused search
evidence_b = self.retrieve(query_b, ...) # Kymriah-focused search
Two independent retrieve() calls are made, each with the entity's target antigen set as a field filter. This ensures each entity gets its own set of evidence.
Comparison context:
get_comparison_context() calls the appropriate get_*_context() functions for both entities and joins them with a separator. This provides structured knowledge (approval dates, toxicity profiles, mechanism data) that the LLM can use to build a comparison table.
7.2 The Comparative Prompt¶
The comparative prompt instructs the LLM to produce structured output:
f"1. A **comparison table** in markdown format with key dimensions "
f"as rows and the two entities as columns.\n"
f"2. **Advantages** of each entity (bulleted list).\n"
f"3. **Limitations** of each entity (bulleted list).\n"
f"4. A **clinical context** paragraph explaining when each might "
f"be preferred.\n\n"
7.3 ComparativeResult Model¶
The result is a ComparativeResult containing two CrossCollectionResult instances:
class ComparativeResult(BaseModel):
query: str
entity_a: str
entity_b: str
evidence_a: CrossCollectionResult
evidence_b: CrossCollectionResult
comparison_context: str = ""
total_search_time_ms: float = 0.0
@property
def total_hits(self) -> int:
return self.evidence_a.hit_count + self.evidence_b.hit_count
Chapter 8: Export System Deep Dive¶
The export system (src/export.py, 1,487 lines) generates reports in three formats.
8.1 Markdown Export¶
export_markdown() produces a human-readable report with:
- Header: query, timestamp, filters
- Response: the LLM-generated answer verbatim
- Evidence: collection-specific tables using
_format_evidence_table() - Knowledge context: verbatim knowledge graph text
- Search metrics: results count, collections searched, latency
- Footer: version string
Each collection has its own table column layout. For example, Literature gets ID | Score | Source | Title | Year | Target | Journal, while Safety gets ID | Score | Product | Event | Severity | Onset | Source.
8.2 JSON Export¶
export_json() produces machine-readable output using Pydantic's .model_dump() for proper serialization:
This recursively serializes all SearchHit objects, including their metadata dicts, into a JSON-serializable structure.
8.3 PDF Export Architecture¶
The PDF export uses reportlab's Platypus (Page Layout and Typography Using Scripts) framework. Key components:
Color palette:
_NVIDIA_GREEN = colors.HexColor("#76B900")
_DARK_BG = colors.HexColor("#1a1a1a")
_TABLE_ALT = colors.HexColor("#f0f0f0")
_LIGHT_GRAY = colors.HexColor("#666666")
Custom styles:
def _build_pdf_styles() -> dict:
return {
"title": ParagraphStyle("PDFTitle", fontSize=22, textColor=_NVIDIA_GREEN),
"h2": ParagraphStyle("PDFH2", fontSize=14, textColor=_NVIDIA_GREEN),
"h3": ParagraphStyle("PDFH3", fontSize=11, textColor="#333333"),
"body": ParagraphStyle("PDFBody", fontSize=9, leading=13),
"meta": ParagraphStyle("PDFMeta", fontSize=9, textColor=_LIGHT_GRAY),
"footer": ParagraphStyle("PDFFooter", fontSize=8, textColor=_LIGHT_GRAY),
}
Document construction:
buffer = io.BytesIO()
doc = SimpleDocTemplate(
buffer, pagesize=letter,
leftMargin=0.6*inch, rightMargin=0.6*inch,
topMargin=0.6*inch, bottomMargin=0.6*inch,
)
The PDF is built into an in-memory BytesIO buffer, then returned as bytes. No temporary files are created.
Evidence tables:
Each collection gets a styled Table with:
- NVIDIA green header row (_NVIDIA_GREEN background, white text)
- Alternating row colors (white and _TABLE_ALT gray)
- Grid lines in light gray
- Wrapped text using Paragraph objects in cells (7pt font)
Markdown-to-flowables conversion:
The _md_to_flowables() function converts the LLM's markdown response into reportlab flowables:
## headingsbecomeParagraphwithh2style### headingsbecomeParagraphwithh3style**bold**becomes<b>bold</b>XML tags[text](url)becomes<a href="url" color="#76B900">text</a>hyperlinks- Markdown tables are parsed and rendered as styled reportlab
Tableobjects - Bullet lists (
- item) are converted to• item - Block quotes (
> text) get indented italic styling
8.4 Adding a New Export Format¶
To add an export format (e.g., DOCX), follow this pattern:
- Create a new public function:
export_docx(query, response_text, evidence, comp_result, filters_applied) -> bytes - Build the document using the same data (query, response, evidence, knowledge context, metrics)
- Return the content as bytes
- Add a download button in the Streamlit UI
Chapter 9: Testing Strategies¶
9.1 The Mock-Everything Approach¶
The test suite runs without Milvus, without the embedding model, and without the LLM. This is achieved through the fixtures in tests/conftest.py:
@pytest.fixture
def mock_embedder():
"""384-dim zero vectors."""
embedder = MagicMock()
embedder.embed_text.return_value = [0.0] * 384
return embedder
@pytest.fixture
def mock_llm_client():
"""Always responds 'Mock response'."""
client = MagicMock()
client.generate.return_value = "Mock response"
client.generate_stream.return_value = iter(["Mock ", "response"])
return client
@pytest.fixture
def mock_collection_manager():
"""search() returns [], search_all() returns {}, stats returns 42."""
manager = MagicMock()
manager.search.return_value = []
manager.search_all.return_value = {name: [] for name in collection_names}
manager.get_collection_stats.return_value = {name: 42 for name in collection_names}
return manager
The mock_embedder returns 384-dimensional zero vectors. This is sufficient for testing the flow -- we are not testing embedding quality, we are testing that the engine correctly assembles, weights, deduplicates, and formats results.
9.2 Unit Test Patterns¶
Testing the RAG engine:
def test_retrieve_returns_cross_collection_result(
mock_embedder, mock_llm_client, mock_collection_manager
):
engine = CARTRAGEngine(
collection_manager=mock_collection_manager,
embedder=mock_embedder,
llm_client=mock_llm_client,
)
query = AgentQuery(question="What is CD19?")
result = engine.retrieve(query)
assert isinstance(result, CrossCollectionResult)
assert result.query == "What is CD19?"
assert result.total_collections_searched == 11
Testing models:
def test_clinical_trial_id_validation():
with pytest.raises(ValidationError):
ClinicalTrial(
id="INVALID", # Must match ^NCT\d{8}$
title="Test",
text_summary="Test",
)
Testing knowledge graph:
def test_get_target_context_cd19():
ctx = get_target_context("CD19")
assert "B-Lymphocyte Antigen CD19" in ctx
assert "ELIANA" in ctx
assert "Kymriah" in ctx
Testing query expansion:
def test_crs_expansion_includes_tocilizumab():
terms = expand_query("What causes CRS?")
assert "tocilizumab" in terms
assert "IL-6" in terms
Testing export:
def test_export_markdown_structure(sample_evidence):
md = export_markdown(
query="Test query",
response_text="Test response",
evidence=sample_evidence,
)
assert "# CAR-T Intelligence Report" in md
assert "Test query" in md
assert "Evidence Sources" in md
9.3 Testing Without Milvus¶
The entire test suite runs without Milvus because mock_collection_manager replaces all Milvus interactions with MagicMock return values. When you need to test actual Milvus behavior:
@pytest.fixture(scope="session")
def live_manager():
"""For integration tests only -- requires running Milvus."""
manager = CARTCollectionManager(host="localhost", port=19530)
manager.connect()
yield manager
manager.disconnect()
@pytest.mark.integration
def test_insert_and_search(live_manager):
"""Requires Milvus. Run with: pytest -m integration"""
live_manager.create_collection("test_coll", LITERATURE_SCHEMA, drop_existing=True)
# ... insert records, search, verify results ...
live_manager.drop_collection("test_coll")
Mark integration tests with @pytest.mark.integration so they are skipped during normal CI runs.
9.4 Property-Based Testing Ideas¶
For properties that should always hold:
from hypothesis import given, strategies as st
@given(st.text(min_size=1, max_size=500))
def test_expand_query_never_raises(query):
"""expand_query should handle any input without exceptions."""
result = expand_query(query)
assert isinstance(result, list)
@given(st.lists(
st.builds(SearchHit, collection=st.text(), id=st.text(), score=st.floats(0,1), text=st.text()),
min_size=0, max_size=100,
))
def test_merge_and_rank_caps_at_30(hits):
"""Merged results should never exceed 30."""
engine = CARTRAGEngine(MagicMock(), MagicMock(), MagicMock())
result = engine._merge_and_rank(hits)
assert len(result) <= 30
Chapter 10: Performance Optimization¶
10.1 Parallel Search Tuning¶
The search_all() method uses ThreadPoolExecutor(max_workers=len(collections)) -- 11 threads for 11 collections. This is I/O-bound work (waiting for Milvus network responses), so more threads than CPU cores is appropriate.
If you add more collections and observe thread contention, you can cap the thread pool:
10.2 Embedding Caching¶
For frequently repeated queries (e.g., the same entity lookup in find_related()), embedding the same text repeatedly wastes GPU cycles. A simple LRU cache:
from functools import lru_cache
class CachedEmbedder:
def __init__(self, base_embedder, maxsize=1024):
self._base = base_embedder
self._embed = lru_cache(maxsize=maxsize)(self._compute)
def _compute(self, text: str) -> tuple:
return tuple(self._base.embed_text(text))
def embed_text(self, text: str) -> list:
return list(self._embed(text))
Wrap the embedder before injecting it into the engine. The lru_cache requires hashable inputs, so we convert the list to a tuple internally.
10.3 Milvus Index Parameters¶
The two critical parameters:
nlist (index build): Currently 1024. This creates 1024 cluster centroids. The rule of thumb is nlist = 4 * sqrt(N) where N is the number of vectors. For 10,000 vectors: 4 * sqrt(10000) = 400. For 100,000 vectors: 4 * sqrt(100000) = 1264. The current value of 1024 is appropriate for collections up to ~65,000 vectors.
nprobe (search): Currently 16. Increasing nprobe improves recall at the cost of latency. Benchmark your specific data:
| nprobe | Recall@10 | Latency (ms) |
|---|---|---|
| 4 | ~85% | ~1.5 |
| 16 | ~97% | ~4.0 |
| 64 | ~99% | ~12.0 |
| 256 | ~99.9% | ~40.0 |
10.4 Score Threshold Tuning¶
The default SCORE_THRESHOLD=0.4 filters out results with cosine similarity below 0.4. This is intentionally permissive -- better to include a marginally relevant result than miss important evidence. If you are seeing too many low-quality results:
Monitor the effect on evidence count per query using the cart_evidence_count Prometheus histogram.
10.5 Batch Size Optimization¶
The default EMBEDDING_BATCH_SIZE=32 balances GPU memory usage and throughput. On the DGX Spark with 128GB unified memory:
| Batch Size | Throughput (texts/sec) | GPU Memory (MB) |
|---|---|---|
| 8 | ~200 | ~100 |
| 32 | ~500 | ~200 |
| 64 | ~600 | ~350 |
| 128 | ~650 | ~600 |
Beyond 64, throughput gains are minimal because BGE-small is a lightweight model. Use 32 as the default; increase to 64 if ingesting large datasets.
Chapter 11: Production Deployment¶
11.1 Docker Multi-Stage Build¶
Use a multi-stage Dockerfile to minimize image size:
# Stage 1: Build
FROM python:3.10-slim AS builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt
# Stage 2: Runtime
FROM python:3.10-slim
WORKDIR /app
COPY --from=builder /install /usr/local
COPY . .
# Download embedding model at build time
RUN python -c "from sentence_transformers import SentenceTransformer; SentenceTransformer('BAAI/bge-small-en-v1.5')"
EXPOSE 8521 8522
CMD ["uvicorn", "api.main:app", "--host", "0.0.0.0", "--port", "8522"]
The key optimization is downloading the embedding model at build time. This prevents a ~300MB download on every container start.
11.2 docker-compose Production Config¶
version: '3.8'
services:
cart-api:
build: .
ports:
- "8522:8522"
environment:
- CART_MILVUS_HOST=milvus
- CART_MILVUS_PORT=19530
- CART_LLM_PROVIDER=anthropic
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
- CART_METRICS_ENABLED=true
- CART_INGEST_ENABLED=true
depends_on:
milvus:
condition: service_healthy
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8522/health"]
interval: 30s
timeout: 10s
retries: 3
cart-ui:
build: .
command: streamlit run app/cart_ui.py --server.port 8521
ports:
- "8521:8521"
environment:
- CART_MILVUS_HOST=milvus
- CART_MILVUS_PORT=19530
- ANTHROPIC_API_KEY=${ANTHROPIC_API_KEY}
depends_on:
milvus:
condition: service_healthy
milvus:
image: milvusdb/milvus:v2.4-latest
ports:
- "19530:19530"
volumes:
- milvus_data:/var/lib/milvus
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9091/healthz"]
interval: 10s
timeout: 5s
retries: 5
volumes:
milvus_data:
11.3 Environment Variable Management¶
All configuration flows through CARTSettings (Pydantic BaseSettings with env_prefix="CART_"):
| Variable | Default | Description |
|---|---|---|
CART_MILVUS_HOST |
localhost | Milvus server hostname |
CART_MILVUS_PORT |
19530 | Milvus server port |
CART_LLM_MODEL |
claude-sonnet-4-6 | Anthropic model ID |
CART_TOP_K_PER_COLLECTION |
5 | Max results per collection |
CART_SCORE_THRESHOLD |
0.4 | Min cosine similarity |
CART_WEIGHT_LITERATURE |
0.20 | Literature collection weight |
CART_INGEST_SCHEDULE_HOURS |
168 | Scheduler interval (hours) |
CART_INGEST_ENABLED |
false | Enable background ingest |
CART_METRICS_ENABLED |
true | Enable Prometheus metrics |
CART_CITATION_HIGH_THRESHOLD |
0.75 | Score for "high relevance" |
CART_CITATION_MEDIUM_THRESHOLD |
0.60 | Score for "medium relevance" |
CART_MAX_CONVERSATION_CONTEXT |
3 | Prior exchanges in memory |
11.4 Health Checks¶
The FastAPI /health endpoint returns collection count and total vector count:
Returns HTTP 503 if the engine is not initialized or Milvus is unavailable.
11.5 Monitoring with Grafana¶
The src/metrics.py module exposes Prometheus metrics with the cart_ prefix. Key metrics:
| Metric | Type | Labels | Description |
|---|---|---|---|
cart_query_latency_seconds |
Histogram | query_type | Processing time per query |
cart_queries_total |
Counter | query_type, status | Total queries by type and status |
cart_evidence_count |
Histogram | -- | Evidence items returned per query |
cart_collection_hits_total |
Counter | collection | Hits by collection |
cart_llm_tokens_total |
Counter | direction | LLM token usage |
cart_collection_size |
Gauge | collection | Current records per collection |
cart_last_ingest_timestamp |
Gauge | source | Last ingest run timestamp |
cart_active_connections |
Gauge | -- | Active API connections |
Create a Grafana dashboard with panels for:
- Query latency P50/P95/P99 (histogram)
- Queries per minute by type (counter rate)
- Evidence count distribution (histogram)
- Collection sizes over time (gauge)
- Last ingest freshness (gauge, alert if >168h stale)
Chapter 12: Integration with HCLS AI Factory¶
12.1 The 3-Stage Pipeline¶
The HCLS AI Factory runs three stages:
- Genomics Pipeline (120-240 min): FASTQ -> VCF via Parabricks (BWA-MEM2, DeepVariant). Produces 11.7M variants.
- RAG/Chat Pipeline (interactive): ClinVar (~2.7M) + AlphaMissense (71M) + Milvus (3.5M vectors) + Claude. Produces the
genomic_evidenceMilvus collection. - Drug Discovery Pipeline (8-16 min): MolMIM generation + DiffDock docking + RDKit scoring.
The CAR-T Intelligence Agent is a new module that sits alongside Stage 2, reading from the same Milvus instance and adding 10 domain-specific collections.
12.2 The Genomic Evidence Bridge¶
The genomic_evidence collection in Milvus is created by the RAG/Chat Pipeline (Stage 2), not by the CAR-T agent. The CAR-T agent treats it as read-only:
# In collections.py
COLLECTION_MODELS: Dict[str, type] = {
# ... 10 owned collections ...
"genomic_evidence": None, # Read-only (no inserts from this agent)
}
The schema matches the RAG/Chat Pipeline's output:
GENOMIC_EVIDENCE_FIELDS = [
FieldSchema(name="id", dtype=DataType.VARCHAR, is_primary=True, max_length=200),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=384),
FieldSchema(name="chrom", dtype=DataType.VARCHAR, max_length=10),
FieldSchema(name="pos", dtype=DataType.INT64),
FieldSchema(name="gene", dtype=DataType.VARCHAR, max_length=50),
FieldSchema(name="consequence", dtype=DataType.VARCHAR, max_length=100),
FieldSchema(name="clinical_significance", dtype=DataType.VARCHAR, max_length=200),
FieldSchema(name="am_pathogenicity", dtype=DataType.FLOAT),
FieldSchema(name="am_class", dtype=DataType.VARCHAR, max_length=30),
# ...
]
When a user asks "What genomic variants affect CD19 CAR-T response?", the agent searches this collection alongside all 10 CAR-T collections, combining patient-specific genomic data with published literature and clinical trial evidence.
12.3 Sharing Milvus with the RAG/Chat Pipeline¶
Both the CAR-T agent and the RAG/Chat Pipeline connect to the same Milvus instance at port 19530. They use different collection names to avoid conflicts:
- RAG/Chat Pipeline:
clinvar_variants,alpha_missense,genomic_evidence - CAR-T Agent:
cart_literature,cart_trials,cart_constructs, etc.
The shared Milvus instance is the integration point. No additional coordination is needed -- Milvus handles concurrent reads from multiple clients.
12.4 The VAST AI OS AgentEngine Model¶
The CARTIntelligenceAgent class follows the VAST AI OS AgentEngine pattern:
AgentEngine.run()
|-> Plan -> search_plan()
|-> Execute -> rag_engine.retrieve()
|-> Reflect -> evaluate_evidence()
|-> Report -> generate_report()
This pattern maps directly to VAST AI OS orchestration, where an AgentEngine receives a task, plans its approach, executes subtasks, reflects on results, and produces output.
Chapter 13: Future Architecture¶
13.1 Multi-Agent Systems¶
The current single-agent architecture can be extended to a multi-agent system where specialized agents collaborate:
- Literature Agent: Focused PubMed/PMC search with citation network analysis
- Clinical Agent: ClinicalTrials.gov expert with enrollment prediction
- Manufacturing Agent: CMC process optimization using historical batch data
- Safety Agent: Pharmacovigilance specialist with FAERS signal detection
- Orchestrator Agent: Routes queries to the appropriate specialist agent(s)
13.2 Graph Databases for Knowledge¶
The current knowledge graph is a Python dictionary. As the knowledge grows, migrating to a graph database (Neo4j, Amazon Neptune) would enable:
- Relationship traversal: "Find all products targeting the same antigen as Kymriah"
- Path queries: "What connects CRS to CD28 costimulation through IL-6?"
- Graph neural networks for knowledge-graph-augmented retrieval
13.3 Fine-Tuned Domain Embeddings¶
BGE-small-en-v1.5 is a general-purpose embedding model. Fine-tuning on CAR-T literature pairs would improve retrieval quality for domain-specific jargon:
- "scFv" should be closer to "single-chain variable fragment" than to "standard cable variant"
- "4-1BB" should be closer to "CD137 costimulatory" than to "4-1 basketball"
- "Flu/Cy" should be closer to "lymphodepletion conditioning" than to "influenza/cytomegalovirus"
13.4 Real-Time Data Streaming¶
The current scheduler polls PubMed and ClinicalTrials.gov weekly. A streaming architecture would enable:
- Kafka/Pulsar topics for new publications, trial updates, and safety signals
- Change data capture (CDC) from FAERS updates
- Real-time alerts when new evidence matches a user's saved query
13.5 VAST AI OS Integration¶
The VAST AI OS platform provides:
- AgentEngine for stateful agent execution
- ModelEngine for serving BGE-small-en-v1.5 at scale
- DataEngine for Milvus management and data pipelines
- StorageEngine for the DGX Spark's unified memory architecture
Integration would enable the agent to run as a managed service with automatic scaling, health monitoring, and centralized logging.
Appendix A: Complete API Reference¶
GET /health¶
Returns service health with collection and vector counts.
Response:
Returns 503 if the engine or Milvus connection is unavailable.
GET /collections¶
Returns all collection names and their record counts.
Response:
{
"collections": [
{"name": "cart_literature", "record_count": 350},
{"name": "cart_trials", "record_count": 200},
{"name": "cart_constructs", "record_count": 45},
{"name": "cart_assays", "record_count": 120},
{"name": "cart_manufacturing", "record_count": 80},
{"name": "cart_safety", "record_count": 95},
{"name": "cart_biomarkers", "record_count": 60},
{"name": "cart_regulatory", "record_count": 42},
{"name": "cart_sequences", "record_count": 35},
{"name": "cart_realworld", "record_count": 70},
{"name": "genomic_evidence", "record_count": 150}
],
"total": 11
}
POST /query¶
Full RAG query: retrieve evidence from Milvus, augment with the knowledge graph, and synthesize an LLM response. Requires both the embedding model and LLM client to be available.
curl -X POST http://localhost:8522/query \
-H "Content-Type: application/json" \
-d '{
"question": "What are the resistance mechanisms for CD19 CAR-T therapy?",
"target_antigen": "CD19",
"year_min": 2020,
"year_max": 2026
}'
Request body: | Field | Type | Required | Description | |---|---|---|---| | question | string | Yes | Natural-language question | | target_antigen | string | No | Filter by target antigen (e.g., CD19, BCMA) | | collections | list[string] | No | Restrict search to specific collections | | year_min | int | No | Minimum publication year (1990-2030) | | year_max | int | No | Maximum publication year (1990-2030) |
Response:
{
"question": "What are the resistance mechanisms for CD19 CAR-T therapy?",
"answer": "CD19 CAR-T therapy faces several resistance mechanisms...",
"evidence": [
{
"collection": "Literature",
"id": "35123456",
"score": 0.912,
"text": "CD19 loss occurs in 10-20% of patients...",
"metadata": {"title": "...", "year": 2023}
}
],
"knowledge_context": "## Target Antigen: CD19\n...",
"collections_searched": 11,
"search_time_ms": 34.5
}
POST /search¶
Evidence-only retrieval (no LLM). Returns evidence snippets without synthesis. Useful for fast lookups or when the client handles its own synthesis.
curl -X POST http://localhost:8522/search \
-H "Content-Type: application/json" \
-d '{
"question": "Tocilizumab dosing for CRS",
"collections": ["cart_safety", "cart_biomarkers"]
}'
Response schema is identical to /query but without the answer field.
POST /find-related¶
Cross-collection entity linking. Finds all evidence related to an entity (product name, target antigen, trial ID, etc.) across all 11 collections.
curl -X POST http://localhost:8522/find-related \
-H "Content-Type: application/json" \
-d '{
"entity": "Yescarta",
"top_k": 5
}'
Response:
{
"entity": "Yescarta",
"results": {
"cart_literature": [{"collection": "Literature", "id": "...", "score": 0.89, ...}],
"cart_trials": [{"collection": "Trial", "id": "NCT02348216", "score": 0.95, ...}],
"cart_constructs": [{"collection": "Construct", "id": "construct-yescarta", ...}],
"cart_safety": [{"collection": "Safety", "id": "safety-crs-yescarta", ...}],
"cart_regulatory": [{"collection": "Regulatory", "id": "reg-yescarta-bla", ...}]
},
"total_hits": 22
}
GET /knowledge/stats¶
Returns statistics about the curated knowledge graph.
Response:
{
"target_antigens": 34,
"targets_with_approved_products": 2,
"toxicity_profiles": 17,
"manufacturing_processes": 20,
"biomarkers": 23,
"regulatory_products": 6
}
GET /metrics¶
Prometheus-compatible metrics endpoint. Returns counters in Prometheus exposition format.
Response:
# HELP cart_api_requests_total Total API requests
# TYPE cart_api_requests_total counter
cart_api_requests_total 147
# HELP cart_api_query_requests_total Total /query requests
# TYPE cart_api_query_requests_total counter
cart_api_query_requests_total 42
# HELP cart_collection_vectors Number of vectors per collection
# TYPE cart_collection_vectors gauge
cart_collection_vectors{collection="cart_literature"} 350
cart_collection_vectors{collection="cart_trials"} 200
...
Appendix B: Configuration Reference¶
All fields from config/settings.py (CARTSettings):
| Field | Type | Default | Env Variable | Description |
|---|---|---|---|---|
| PROJECT_ROOT | Path | (computed) | -- | Project root directory |
| DATA_DIR | Path | PROJECT_ROOT/data | -- | Data directory |
| CACHE_DIR | Path | DATA_DIR/cache | -- | Cache directory |
| REFERENCE_DIR | Path | DATA_DIR/reference | -- | Reference data directory |
| RAG_PIPELINE_ROOT | Path | /app/rag-chat-pipeline | CART_RAG_PIPELINE_ROOT | Parent RAG pipeline path |
| MILVUS_HOST | str | localhost | CART_MILVUS_HOST | Milvus server hostname |
| MILVUS_PORT | int | 19530 | CART_MILVUS_PORT | Milvus server port |
| COLLECTION_LITERATURE | str | cart_literature | CART_COLLECTION_LITERATURE | Literature collection name |
| COLLECTION_TRIALS | str | cart_trials | CART_COLLECTION_TRIALS | Trials collection name |
| COLLECTION_CONSTRUCTS | str | cart_constructs | CART_COLLECTION_CONSTRUCTS | Constructs collection name |
| COLLECTION_ASSAYS | str | cart_assays | CART_COLLECTION_ASSAYS | Assays collection name |
| COLLECTION_MANUFACTURING | str | cart_manufacturing | CART_COLLECTION_MANUFACTURING | Manufacturing collection name |
| COLLECTION_SAFETY | str | cart_safety | CART_COLLECTION_SAFETY | Safety collection name |
| COLLECTION_BIOMARKERS | str | cart_biomarkers | CART_COLLECTION_BIOMARKERS | Biomarkers collection name |
| COLLECTION_REGULATORY | str | cart_regulatory | CART_COLLECTION_REGULATORY | Regulatory collection name |
| COLLECTION_SEQUENCES | str | cart_sequences | CART_COLLECTION_SEQUENCES | Sequences collection name |
| COLLECTION_REALWORLD | str | cart_realworld | CART_COLLECTION_REALWORLD | Real-world evidence collection name |
| COLLECTION_GENOMIC | str | genomic_evidence | CART_COLLECTION_GENOMIC | Genomic evidence collection (read-only) |
| EMBEDDING_MODEL | str | BAAI/bge-small-en-v1.5 | CART_EMBEDDING_MODEL | HuggingFace model ID |
| EMBEDDING_DIMENSION | int | 384 | CART_EMBEDDING_DIMENSION | Vector dimension |
| EMBEDDING_BATCH_SIZE | int | 32 | CART_EMBEDDING_BATCH_SIZE | Batch size for embedding |
| LLM_PROVIDER | str | anthropic | CART_LLM_PROVIDER | LLM provider |
| LLM_MODEL | str | claude-sonnet-4-6 | CART_LLM_MODEL | LLM model identifier |
| ANTHROPIC_API_KEY | str | None | ANTHROPIC_API_KEY | Anthropic API key |
| TOP_K_PER_COLLECTION | int | 5 | CART_TOP_K_PER_COLLECTION | Max results per collection |
| SCORE_THRESHOLD | float | 0.4 | CART_SCORE_THRESHOLD | Min cosine similarity |
| WEIGHT_LITERATURE | float | 0.20 | CART_WEIGHT_LITERATURE | Literature weight |
| WEIGHT_TRIALS | float | 0.16 | CART_WEIGHT_TRIALS | Trials weight |
| WEIGHT_CONSTRUCTS | float | 0.10 | CART_WEIGHT_CONSTRUCTS | Constructs weight |
| WEIGHT_ASSAYS | float | 0.09 | CART_WEIGHT_ASSAYS | Assays weight |
| WEIGHT_MANUFACTURING | float | 0.07 | CART_WEIGHT_MANUFACTURING | Manufacturing weight |
| WEIGHT_SAFETY | float | 0.08 | CART_WEIGHT_SAFETY | Safety weight |
| WEIGHT_BIOMARKERS | float | 0.08 | CART_WEIGHT_BIOMARKERS | Biomarkers weight |
| WEIGHT_REGULATORY | float | 0.06 | CART_WEIGHT_REGULATORY | Regulatory weight |
| WEIGHT_SEQUENCES | float | 0.06 | CART_WEIGHT_SEQUENCES | Sequences weight |
| WEIGHT_REALWORLD | float | 0.07 | CART_WEIGHT_REALWORLD | Real-world evidence weight |
| WEIGHT_GENOMIC | float | 0.04 | CART_WEIGHT_GENOMIC | Genomic evidence weight |
| NCBI_API_KEY | str | None | CART_NCBI_API_KEY | NCBI API key (optional, increases rate limit) |
| PUBMED_MAX_RESULTS | int | 5000 | CART_PUBMED_MAX_RESULTS | Max PubMed results per query |
| CT_GOV_BASE_URL | str | https://clinicaltrials.gov/api/v2 | CART_CT_GOV_BASE_URL | ClinicalTrials.gov API base URL |
| API_HOST | str | 0.0.0.0 | CART_API_HOST | FastAPI bind address |
| API_PORT | int | 8522 | CART_API_PORT | FastAPI port |
| STREAMLIT_PORT | int | 8521 | CART_STREAMLIT_PORT | Streamlit UI port |
| METRICS_ENABLED | bool | True | CART_METRICS_ENABLED | Enable Prometheus metrics |
| INGEST_SCHEDULE_HOURS | int | 168 | CART_INGEST_SCHEDULE_HOURS | Scheduler interval (hours) |
| INGEST_ENABLED | bool | False | CART_INGEST_ENABLED | Enable background ingest |
| MAX_CONVERSATION_CONTEXT | int | 3 | CART_MAX_CONVERSATION_CONTEXT | Prior exchanges in memory |
| CITATION_HIGH_THRESHOLD | float | 0.75 | CART_CITATION_HIGH_THRESHOLD | Score for high relevance |
| CITATION_MEDIUM_THRESHOLD | float | 0.60 | CART_CITATION_MEDIUM_THRESHOLD | Score for medium relevance |
Appendix C: Metric Reference¶
Prometheus Metrics (src/metrics.py)¶
All metrics use the cart_ prefix.
Histograms:
| Metric | Labels | Buckets | Description |
|---|---|---|---|
cart_query_latency_seconds |
query_type | 0.1, 0.5, 1, 2, 5, 10, 30 | Wall-clock processing time per query |
cart_evidence_count |
-- | 0, 5, 10, 15, 20, 25, 30 | Evidence items returned per query |
Counters:
| Metric | Labels | Description |
|---|---|---|
cart_queries_total |
query_type, status | Total queries processed (rag, agent, comparative, entity_link) |
cart_collection_hits_total |
collection | Total hits by collection name |
cart_llm_tokens_total |
direction | LLM tokens used (prompt, completion) |
Gauges:
| Metric | Labels | Description |
|---|---|---|
cart_active_connections |
-- | Current active API connections |
cart_collection_size |
collection | Current record count per collection |
cart_last_ingest_timestamp |
source | Unix timestamp of last ingest run (pubmed, clinical_trials) |
Helper Functions¶
from src.metrics import record_query, record_collection_hits, update_collection_sizes, get_metrics_text
# Record a completed query
record_query(query_type="rag", latency=1.23, hit_count=15, status="success")
# Record per-collection hit counts
record_collection_hits({"cart_literature": 5, "cart_trials": 3, "cart_safety": 2})
# Update collection sizes (typically after get_collection_stats())
update_collection_sizes({"cart_literature": 350, "cart_trials": 200, ...})
# Get Prometheus exposition text for /metrics endpoint
text = get_metrics_text()
Graceful Degradation¶
If prometheus_client is not installed, all metric objects become no-op stubs. The application continues to function without metrics:
try:
from prometheus_client import Counter, Gauge, Histogram, generate_latest
_PROMETHEUS_AVAILABLE = True
except ImportError:
_PROMETHEUS_AVAILABLE = False
# ... no-op stubs ...
API-Level Metrics¶
The FastAPI /metrics endpoint provides basic request counters even without prometheus_client:
cart_api_requests_total 147
cart_api_query_requests_total 42
cart_api_search_requests_total 89
cart_api_find_related_requests_total 12
cart_api_errors_total 4
cart_collection_vectors{collection="cart_literature"} 350
These are simple in-memory Python dict counters incremented on each request. They reset when the process restarts.
Generated by HCLS AI Factory -- CAR-T Intelligence Agent Author: Adam Jones | March 2026