Antaris Analytics / docs

Documentation

Deterministic infrastructure for AI agents. Zero external dependencies across all packages.

SUITE v4.2.0 1,526 tests · Apache 2.0 · Python 3.9+
Packages
antaris-memory
v4.2.0 · 566 tests
File-based persistent memory with BM25 search, sharding, natural decay, and MCP server. 4.2: export/import API, GCS backend stub.
antaris-router
v4.1.0 · 194 tests
Adaptive model routing with TF-IDF classification, outcome learning, A/B testing, and SLA monitoring.
antaris-guard
v4.1.0 · 380 tests
Prompt injection detection, PII filtering, reputation tracking, rate limiting, and behavioral analysis.
antaris-context
v4.2.0 · 150 tests
Context window management with token budgets, overflow cascade, compression strategies, and relevance scoring. 4.2: hard budget enforcement, ContextBudgetExceeded exception.
antaris-pipeline
v4.2.0 · 215 tests
Agent orchestration pipeline with event-driven architecture, lifecycle hooks, and sub-agent coordination. 4.2: PipelineTelemetry, callback hooks, summary() method.
antaris-contracts
v4.2.0 · 52 tests
Versioned state schemas, failure semantics, concurrency model contracts, and debug CLI for the full suite.

Install

# Full suite
pip install antaris-memory antaris-router antaris-guard antaris-context antaris-pipeline antaris-contracts

# Individual packages
pip install antaris-memory   # memory only
pip install antaris-guard    # security only
pip install antaris-router   # routing only
pip install antaris-contracts # schemas + debug CLI
v4.2.0 — New API Reference
antaris-memory 4.2.0

MemorySystem.export(output_path, include_metadata=True) → int

# Export all memories to a versioned JSON file
count = memory.export("/path/to/backup.json")
count = memory.export("/path/to/backup.json", include_metadata=False)
Returns number of memories exported. Writes versioned JSON with schema version, timestamp, and all memory entries.

MemorySystem.import_from(input_path, merge=True) → int

# Import memories with merge (default) or replace mode
count = memory.import_from("/path/to/backup.json") # merge
count = memory.import_from("/path/to/backup.json", merge=False) # replace
Returns number of memories imported. In merge mode, duplicate entries are skipped (deduplication by key). In replace mode, existing memories are cleared first.

GCSMemoryBackend(bucket, prefix, credentials_path) stub — coming in 4.3

# GCS backend interface defined in 4.2 — full implementation in 4.3
from antaris_memory.backends import GCSMemoryBackend
backend = GCSMemoryBackend(
    bucket="my-agent-memory",
    prefix="agents/prod/",
    credentials_path="/path/to/service-account.json"
)
Google Cloud Storage backend stub. The interface is defined and stable as of 4.2. Full read/write implementation ships in 4.3.
antaris-context 4.2.0

ContextManager.render_hard_limited(budget_tokens) → list

# Hard budget enforcement — raises ContextBudgetExceeded if over budget
try:
    messages = ctx.render_hard_limited(budget_tokens=4096)
except ContextBudgetExceeded as e:
    print(f"Over budget: used {e.used} of {e.budget} tokens")
Unlike render(), this method enforces a strict token ceiling. It automatically trims low-priority messages to fit the budget, and raises ContextBudgetExceeded only when trimming cannot satisfy the constraint.

ContextBudgetExceeded(used, budget) exception

from antaris_context.exceptions import ContextBudgetExceeded

# Attributes:
e.used # int — tokens used by current context
e.budget # int — budget_tokens argument passed
Raised by render_hard_limited() when the context cannot be trimmed to fit within the requested budget.
antaris-pipeline 4.2.0

PipelineTelemetry dataclass

from antaris_pipeline.telemetry import PipelineTelemetry

# PipelineTelemetry fields:
telemetry.stage_timings # dict[str, float] — ms per stage
telemetry.total_ms # float — total pipeline time
telemetry.cost_estimate # float — estimated token cost
telemetry.turn_id # str — unique turn identifier
Per-turn telemetry dataclass. Automatically populated by the pipeline and passed to the telemetry callback if configured.

PipelineTelemetry.slowest_stage() → tuple[str, float]

stage_name, ms = telemetry.slowest_stage()
# e.g. ("memory_recall", 7.23)
Returns the (stage_name, duration_ms) tuple for the slowest stage in this turn. Useful for identifying bottlenecks.

PipelineTelemetry.summary() → str

print(telemetry.summary())
# e.g. "turn abc123: 12.4ms total | slowest: memory_recall (7.2ms) | cost: $0.0004"
One-line human-readable summary of the turn's telemetry. Suitable for logging or monitoring dashboards.

Telemetry callback hook

# Configure real-time telemetry callback at pipeline init
pipeline = AntarisPipeline(
    ...,
    telemetry_callback=lambda t: logger.info(t.summary())
)

# Callback receives PipelineTelemetry after each turn
Optional callback invoked after each pipeline turn completes. Receives a PipelineTelemetry instance for real-time monitoring, alerting, or logging.