Python SDK
The agentverse-memory package wraps the Agentverse Memory MCP API in an idiomatic async/sync Python client. Pure Python wheel — no native dependencies, no build tools. Python 3.10+ required.
1Installation
pip install agentverse-memoryNo native dependencies, no build tools. Optional extras for framework integrations:
pip install "agentverse-memory[langchain]" # LangChain integrationpip install "agentverse-memory[crewai]" # CrewAI integrationpip install "agentverse-memory[all]" # All integrations # Verifypython -c "import agentverse_memory; print(agentverse_memory.__version__)"2Authentication
Get a free API key at memory.agentverse.ai. Explorer tier is free, includes graph memory, no credit card required.
Set your key as an environment variable — never hardcode it:
export AVMEM_API_KEY="avmem_sk_your_key_here"The client picks it up automatically:
from agentverse_memory import MemoryClient # Reads AVMEM_API_KEY from environment automaticallyclient = MemoryClient(agent_id="my-agent") # Or pass explicitly (useful for multi-tenant setups)client = MemoryClient( api_key="avmem_sk_your_key_here", agent_id="my-agent", base_url="https://am-server-jbneh74b5q-uc.a.run.app", timeout=10.0, # seconds)agent_id is any stable string — use consistent IDs across sessions so memories accumulate. Suggested format: service-role-tenant (e.g. crm-assistant-acme).
3Your First 5 Minutes
1. Store an Episode
Episodic memories are time-stamped events — what your agent did, observed, or was told.
from agentverse_memory import MemoryClient client = MemoryClient(agent_id="research-agent-1") episode = client.episodes.store( content="User asked for Q1 revenue breakdown by region. " "Retrieved from Salesforce. Delivered 3-bullet summary via Slack.", metadata={ "user_id": "user_42", "session_id": "sess_abc123", "tags": ["finance", "q1", "slack"], }, importance=0.8, # 0.0–1.0; higher = pheromone boost on retrieval) print(episode.id) # ep_01hxq7kn3mabcprint(episode.created_at) # 2026-05-14T08:00:00.042Z# No LLM calls at write time — 0.035ms p95 in-process engine write op (am-local)2. Retrieve by Natural Language
results = client.episodes.query( query="What did the user ask about sales?", limit=5, min_score=0.4,) for r in results: print(f"[{r.score:.2f}] {r.content[:80]}") print(f" ↳ {r.metadata.get('tags', [])}")# [0.94] User asked for Q1 revenue breakdown by region.# ↳ ['finance', 'q1', 'slack']Retrieval uses BM25 + TF-IDF hybrid search with pheromone weighting — frequently-retrieved memories bubble up automatically.
3. Store a Semantic Fact
# Store a knowledge triple (subject, predicate, object)fact = client.facts.store( subject="Acme Corp", predicate="signed_contract_with", object_="Fetch.ai", # object_ avoids Python keyword conflict metadata={ "valid_at": "2026-01-15", "confidence": 0.95, "source": "CRM", },)print(fact.id) # fact_02hyr9mp4n... # Connect more facts to build the graphclient.facts.store(subject="Fetch.ai", predicate="headquartered_in", object_="London") # Query the graph with natural languagehits = client.facts.query(query="Where is Fetch.ai based?", limit=3)for h in hits: print(f"{h.subject} → {h.predicate} → {h.object_}")# Fetch.ai → headquartered_in → LondonAsync Client
Use AsyncMemoryClient in async contexts (FastAPI, asyncio-based agents).
import asynciofrom agentverse_memory import AsyncMemoryClient async def main(): async with AsyncMemoryClient(agent_id="async-agent") as client: # Store a semantic fact fact = await client.facts.store( subject="Acme Corp", predicate="signed_contract_with", object_="Fetch.ai", metadata={"valid_at": "2026-01-15", "confidence": 0.95}, ) # Traverse the knowledge graph from a seed node graph = await client.facts.traverse_graph( seed_node="Acme Corp", max_depth=2, limit=20, ) for node in graph.nodes: print(f"{node.subject} --[{node.predicate}]--> {node.object_}") asyncio.run(main())LangChain Integration agentverse-memory[langchain]
Ships AvmemChatMessageHistory (BaseChatMessageHistory), AvmemMemory, and AvmemRetriever for drop-in LangChain integration.
from langchain.chains import ConversationChainfrom langchain_openai import ChatOpenAIfrom agentverse_memory.integrations.langchain import ( AvmemChatMessageHistory, AvmemMemory,) # Session-scoped episodic history — persists across process restartshistory = AvmemChatMessageHistory( agent_id="langchain-assistant", session_id="sess_abc123",) memory = AvmemMemory(history=history) chain = ConversationChain( llm=ChatOpenAI(model="gpt-4o"), memory=memory,) # First sessionchain.predict(input="Acme Corp just renewed their contract for $420K.") # Second session — new process, same memory# chain.predict(input="What's Acme's contract value?")# → "Acme Corp renewed their contract for $420K."For RAG chains, use AvmemRetriever as a drop-in for any LangChain retriever:
from agentverse_memory.integrations.langchain import AvmemRetrieverfrom langchain.chains import RetrievalQA retriever = AvmemRetriever( agent_id="langchain-assistant", memory_types=["episodic", "semantic"], # query both stores limit=8,) qa_chain = RetrievalQA.from_chain_type( llm=ChatOpenAI(model="gpt-4o"), retriever=retriever,) answer = qa_chain.run("What contracts did Acme Corp renew recently?")CrewAI Integration agentverse-memory[crewai]
from crewai import Agent, Task, Crewfrom agentverse_memory.integrations.crewai import AgentverseMemoryTool # Each CrewAI agent gets its own memory namespacememory_tool = AgentverseMemoryTool(agent_id="crew-research-agent") researcher = Agent( role="Senior Research Analyst", goal="Research competitors and synthesize findings into actionable insights", backstory="You are a thorough analyst who never forgets a finding.", tools=[memory_tool],) task = Task( description=( "Research Mem0's pricing changes from April 2026. " "Store each key finding as a memory. " "Query past research before starting so you don't repeat work." ), agent=researcher,) crew = Crew(agents=[researcher], tasks=[task])result = crew.kickoff()AgentverseMemoryTool exposes read and write through a single natural-language interface — the agent decides when to store vs. query based on context.
Memory Types
ep = client.episodes.store(content="...", metadata={...}, importance=0.8)results = client.episodes.query(query="...", limit=5, min_score=0.4)episodes = client.episodes.list(limit=50, offset=0)ep = client.episodes.get(episode_id="ep_01hx...")client.episodes.update(episode_id="ep_01hx...", content="updated content")client.episodes.delete(episode_id="ep_01hx...")timeline = client.episodes.search_timeline( start="2026-01-01T00:00:00Z", end="2026-03-31T23:59:59Z",)client.episodes.consolidate(ids=["ep_01hx...", "ep_02hy..."])fact = client.facts.store(subject="Acme Corp", predicate="signed_contract_with", object_="Fetch.ai", metadata={"confidence": 0.95})facts = client.facts.query(query="What contracts does Acme Corp have?", limit=10)# Graph traversal — A* with pheromone weights — free on all tiersgraph = client.facts.traverse_graph(seed_node="Acme Corp", max_depth=2)path = client.facts.find_path(source="Acme Corp", target="London")print(" → ".join(n.label for n in path.nodes)) # Acme Corp → Fetch.ai → Londonproc = client.procedures.store( goal="Generate a sales report", steps=[ "1. Query Salesforce for deals closed in date range", "2. Group by region and sales rep", "3. Calculate totals and YoY delta", "4. Format as markdown table", ], metadata={"category": "reporting"})match = client.procedures.lookup(goal="Create a monthly sales summary")print(match.steps)Ephemeral key-value with optional TTL. Use for active task state you don't want in the episodic log.
# Store active task context at the start of a taskclient.working.set( key="current_task", value={ "task_id": "task_99", "objective": "Summarize Q1 board report", "step": 3, "total_steps": 8, }, ttl_seconds=3600, # auto-expires; 0 = no expiry) task = client.working.get(key="current_task")print(f"Step {task.value['step']} of {task.value['total_steps']}") # Step 3 of 8 # Update mid-taskclient.working.set(key="current_task", value={**task.value, "step": 4}) keys = client.working.list() # list all active keysclient.working.clear() # clear on completion (TTL handles cleanup too)Shared Spaces Builder+
Multiple agents share a common memory graph. An orchestrator creates the space; workers join via invite token.
# Orchestrator creates a shared memory spacespace = client.spaces.create(name="research-team-memory")print(space.id, space.invite_token) # Worker agent joins the spaceworker = MemoryClient(api_key="...", agent_id="worker-agent-1")worker.spaces.join(space_id=space.id, invite_token=space.invite_token) # Any agent in the space queries the shared knowledge graphresults = client.spaces.query( space_id=space.id, query="What did the team discover about Acme Corp?", limit=10,)Pheromone Graph Traversal Phase 3 Preview
# Traverse from a seed concept — pheromone-weighted edgesgraph = client.facts.traverse_graph(seed_node="Acme Corp", max_depth=2, limit=20) for node in graph.nodes: print(f"{node.subject} --[{node.predicate}]--> {node.object_} " f"(weight: {node.pheromone_weight:.3f})")# Acme Corp --[signed_contract_with]--> Fetch.ai (weight: 0.847)# Fetch.ai --[headquartered_in]-----> London (weight: 0.612) # A* pathfinding between any two concepts (Builder tier and above)path = client.facts.find_path(source="Acme Corp", target="London", max_depth=4)print(" → ".join(n.label for n in path.nodes)) # Acme Corp → Fetch.ai → Londonprint(f"Pheromone path score: {path.total_weight:.3f}")Error Handling & Retry
from agentverse_memory import MemoryClientfrom agentverse_memory.exceptions import ( AuthError, # 401 — invalid or missing API key ForbiddenError, # 403 — feature not on your plan NotFoundError, # 404 — resource does not exist RateLimitError, # 429 — monthly or burst limit exceeded ServerError, # 500/503 — server-side fault AvmemError, # base class for all SDK errors)import time client = MemoryClient(agent_id="my-agent") try: episode = client.episodes.store(content="Important event.")except AuthError: print("Invalid API key. Check AVMEM_API_KEY.")except ForbiddenError as e: print(f"Feature requires upgrade: {e.required_plan}")except RateLimitError as e: print(f"Rate limited. Retry after {e.retry_after}s") time.sleep(e.retry_after) episode = client.episodes.store(content="Important event.")except AvmemError as e: print(f"Unexpected error [{e.code}]: {e}")Built-in retry via constructor (recommended for production):
client = MemoryClient( agent_id="my-agent", max_retries=3, # retry on 429 + 5xx retry_backoff=2.0, # exponential factor (2^attempt seconds))# All calls now automatically retry transient errorsMethod Reference
All 35 methods map 1:1 to MCP tools. See the API Reference for full parameter schemas.
| Method | Description |
|---|---|
| client.episodes.store(...) | Store a time-stamped episodic event |
| client.episodes.query(...) | Natural-language search over episodes |
| client.episodes.get(id) | Retrieve episode by ID |
| client.episodes.list(...) | Paginated list of episodes |
| client.episodes.update(id, ...) | Update episode content or metadata |
| client.episodes.delete(id) | Delete an episode |
| client.episodes.search_timeline(...) | Time-windowed search |
| client.episodes.consolidate(ids) | Merge related episodes |
| client.facts.store(...) | Store a knowledge triple (subject, predicate, object) |
| client.facts.query(...) | Natural-language graph search |
| client.facts.get(id) | Retrieve fact by ID |
| client.facts.traverse_graph(...) | Multi-hop pheromone-weighted graph traversal |
| client.facts.find_path(...) | A* shortest path between two concepts |
| client.facts.update(id, ...) | Update a fact |
| client.facts.delete(id) | Delete a fact |
| client.procedures.store(...) | Store a skill/workflow definition |
| client.procedures.query(...) | Find procedures by goal description |
| client.procedures.get(id) | Retrieve procedure by ID |
| client.procedures.update(id, ...) | Update steps or log execution outcome |
| client.procedures.lookup(goal) | Find best procedure for a goal |
| client.working.set(key, value) | Set a working memory key (with optional TTL) |
| client.working.get(key) | Get a working memory value |
| client.working.list() | List all active working keys |
| client.working.clear() | Clear all working memory |
| client.spaces.create(name) | Create a shared memory space |
| client.spaces.join(space_id, token) | Join a shared space |
| client.spaces.query(space_id, query) | Query a shared space |
| client.health() | Check service health |
| client.stats() | Get usage statistics |
Configuration Reference
from agentverse_memory import MemoryClient client = MemoryClient( api_key="avmem_sk_...", # or AVMEM_API_KEY env var agent_id="my-agent", # or AVMEM_AGENT_ID env var base_url="https://am-server-jbneh74b5q-uc.a.run.app", # or AVMEM_BASE_URL env var timeout=10.0, # request timeout in seconds max_retries=3, # auto-retry on 5xx / 429 retry_backoff=2.0, # exponential factor)All parameters can also be set via environment variables: AVMEM_API_KEY, AVMEM_AGENT_ID, AVMEM_BASE_URL.
Next Steps
#sdk-python on Discord.