Error handling, retry logic, budget limits, structured logging, and running agents as background services. ~400 lines of Python. Production-ready.
A production-grade agent wrapper with: exponential backoff retry on rate limits, per-task and per-session cost tracking with budget enforcement, structured JSON logging of every decision and tool call, model routing (cheap model for simple tasks, expensive model for complex ones), and a background task queue for running agents as services.
This lesson wraps everything we've built with production guardrails.
import anthropic, json, time, logging, uuid from datetime import datetime from dataclasses import dataclass, field from typing import Optional from pathlib import Path # ── Structured logging setup ─────────────────────── class JSONFormatter(logging.Formatter): def format(self, record): log = { "ts": datetime.utcnow().isoformat(), "level": record.levelname, "msg": record.getMessage(), } if hasattr(record, "extra"): log.update(record.extra) return json.dumps(log) def setup_logger(name: str) -> logging.Logger: logger = logging.getLogger(name) logger.setLevel(logging.INFO) h = logging.FileHandler("agent.log") h.setFormatter(JSONFormatter()) logger.addHandler(h) return logger logger = setup_logger("agent") # ── Cost tracking ────────────────────────────────── # Prices per million tokens (as of April 2026) MODEL_COSTS = { "claude-haiku-4-5": {"input": 0.80, "output": 4.00}, "claude-sonnet-4-5": {"input": 3.00, "output": 15.00}, "claude-opus-4-5": {"input": 15.00, "output": 75.00}, } @dataclass class CostTracker: budget_usd: float = 1.0 spent_usd: float = 0.0 calls: int = 0 def record(self, model: str, input_tokens: int, output_tokens: int): costs = MODEL_COSTS.get(model, MODEL_COSTS["claude-sonnet-4-5"]) cost = (input_tokens * costs["input"] + output_tokens * costs["output"]) / 1_000_000 self.spent_usd += cost self.calls += 1 return cost def check_budget(self): if self.spent_usd >= self.budget_usd: raise RuntimeError( f"Budget exceeded: ${self.spent_usd:.4f} / ${self.budget_usd:.2f}" ) # ── Retry wrapper with exponential backoff ───────── def call_with_retry(client, cost_tracker: CostTracker, **kwargs): """Calls Claude with automatic retry on rate limits.""" cost_tracker.check_budget() max_retries = 3 for attempt in range(max_retries): try: resp = client.messages.create(**kwargs) cost = cost_tracker.record( kwargs["model"], resp.usage.input_tokens, resp.usage.output_tokens ) logger.info("api_call", extra={ "model": kwargs["model"], "tokens_in": resp.usage.input_tokens, "tokens_out": resp.usage.output_tokens, "cost_usd": round(cost, 6), "total_spent": round(cost_tracker.spent_usd, 6) }) return resp except anthropic.RateLimitError: wait = 2 ** attempt # 1s, 2s, 4s print(f"Rate limited. Waiting {wait}s (attempt {attempt+1}/{max_retries})") time.sleep(wait) if attempt == max_retries - 1: raise except anthropic.APIError as e: logger.error(f"API error: {e}", extra={"error": str(e)}) raise # ── Model routing ────────────────────────────────── def route_model(task_complexity: str) -> str: """Route to cheaper models for simple tasks.""" if task_complexity == "simple": return "claude-haiku-4-5" # 20x cheaper than Opus elif task_complexity == "medium": return "claude-sonnet-4-5" # balanced else: return "claude-opus-4-5" # complex reasoning # ── Production agent class ───────────────────────── class ProductionAgent: def __init__(self, budget_usd: float = 1.0, tools: list = None): self.client = anthropic.Anthropic() self.cost = CostTracker(budget_usd=budget_usd) self.tools = tools or [] self.session_id = str(uuid.uuid4()[:8]) def run(self, task: str, tool_executor=None, complexity: str = "medium", max_steps: int = 10) -> str: task_id = str(uuid.uuid4()[:8]) model = route_model(complexity) logger.info("task_start", extra={ "session": self.session_id, "task_id": task_id, "model": model, "task": task[:100] }) messages = [{"role":"user","content":task}] start_time = time.time() try: for step in range(max_steps): resp = call_with_retry( self.client, self.cost, model=model, max_tokens=2048, tools=self.tools, messages=messages ) if resp.stop_reason == "end_turn": answer = resp.content[0].text logger.info("task_complete", extra={ "task_id": task_id, "steps": step + 1, "duration_s": round(time.time()-start_time,2), "total_cost": round(self.cost.spent_usd,6) }) return answer # Handle tool calls results = [] for b in resp.content: if b.type == "tool_use": logger.info("tool_call", extra={ "task_id": task_id, "tool": b.name, "step": step, "input_keys": list(b.input.keys()) }) try: result = tool_executor(b.name, b.input) except Exception as e: result = f"Tool error: {e}" logger.warning( f"Tool {b.name} failed", extra={"error": str(e)} ) results.append({ "type":"tool_result", "tool_use_id":b.id, "content":str(result) }) messages += [ {"role":"assistant","content":resp.content}, {"role":"user","content":results} ] return "Max steps reached." except RuntimeError as e: # budget exceeded logger.error(f"Budget exceeded on task {task_id}") raise def usage_summary(self): return { "calls": self.cost.calls, "spent_usd": round(self.cost.spent_usd, 6), "budget_usd": self.cost.budget_usd, "remaining_usd": round(self.cost.budget_usd - self.cost.spent_usd, 6) } # ── Test it ──────────────────────────────────────── if __name__ == "__main__": agent = ProductionAgent(budget_usd=0.10) # $0.10 limit result = agent.run( "Explain the difference between supervised and unsupervised learning.", complexity="simple" # routes to Haiku (cheap) ) print("Answer:", result) print("Usage:", agent.usage_summary()) # Check agent.log for structured JSON logs
Read the log file: After running, open agent.log. Every API call, tool call, and task completion is logged as structured JSON. This makes debugging production agents tractable — you can see exactly which step failed, how many tokens it used, and what it cost.
For production, you often want agents to run in the background — accepting tasks, executing them asynchronously, and returning results. Here's a minimal pattern:
import queue, threading, uuid, time from agent_production import ProductionAgent class AgentService: def __init__(self, workers=2): self.task_queue = queue.Queue() self.results = {} for _ in range(workers): t = threading.Thread(target=self._worker, daemon=True) t.start() def _worker(self): while True: task_id, task, kwargs = self.task_queue.get() try: agent = ProductionAgent(budget_usd=0.50) result = agent.run(task, **kwargs) self.results[task_id] = {"status":"done","result":result} except Exception as e: self.results[task_id] = {"status":"error","error":str(e)} finally: self.task_queue.task_done() def submit(self, task: str, **kwargs) -> str: task_id = str(uuid.uuid4()[:8]) self.results[task_id] = {"status":"pending"} self.task_queue.put((task_id, task, kwargs)) return task_id def get_result(self, task_id: str) -> dict: return self.results.get(task_id, {"status":"not_found"}) # Usage: if __name__ == "__main__": svc = AgentService(workers=2) # Submit multiple tasks simultaneously ids = [ svc.submit("What is machine learning?", complexity="simple"), svc.submit("Explain transformer architecture.", complexity="medium"), ] # Poll for results while any(svc.get_result(i)["status"] == "pending" for i in ids): time.sleep(1) for tid in ids: print(tid, svc.get_result(tid)["status"])
You've built a basic agent, a 5-tool agent, a memory agent, a multi-agent research system, and a production-grade agent with error handling, cost controls, logging, and background deployment. That's a production-ready foundation. Most people never get here.
Take the Live Bootcamp — $1,490Before moving on, make sure you can answer these without looking: