Claude Code Plugins

Community-maintained marketplace

Feedback

session-compression

@bobmatnyc/termpilot
1
0

AI session compression techniques for managing multi-turn conversations efficiently through summarization, embedding-based retrieval, and intelligent context management.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name session-compression
description AI session compression techniques for managing multi-turn conversations efficiently through summarization, embedding-based retrieval, and intelligent context management.
progressive_disclosure [object Object]

AI Session Compression Techniques

Summary

Compress long AI conversations to fit context windows while preserving critical information.

Session compression enables production AI applications to manage multi-turn conversations efficiently by reducing token usage by 70-95% through summarization, embedding-based retrieval, and intelligent context management. Achieve 3-20x compression ratios with minimal performance degradation.

Key Benefits:

  • Cost Reduction: 80-90% token cost savings through hierarchical memory
  • Performance: 2x faster responses with compressed context
  • Scalability: Handle conversations exceeding 1M tokens
  • Quality: Preserve critical information with <2% accuracy loss

When to Use

Use session compression when:

  • Multi-turn conversations approach context window limits (>50% capacity)
  • Long-running chat sessions (customer support, tutoring, code assistants)
  • Token costs become significant (high-volume applications)
  • Response latency increases due to large context
  • Managing conversation history across multiple sessions

Don't use when:

  • Short conversations (<10 turns) fitting easily in context
  • Every detail must be preserved verbatim (legal, compliance)
  • Single-turn or stateless interactions
  • Context window usage is <30%

Ideal scenarios:

  • Chatbots with 50+ turn conversations
  • AI code assistants tracking long development sessions
  • Customer support with multi-session ticket history
  • Educational tutors with student progress tracking
  • Multi-day collaborative AI workflows

Quick Start

Basic Setup with LangChain

from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic
from anthropic import Anthropic

# Initialize Claude client
llm = ChatAnthropic(
    model="claude-3-5-sonnet-20241022",
    api_key="your-api-key"
)

# Setup memory with automatic summarization
memory = ConversationSummaryBufferMemory(
    llm=llm,
    max_token_limit=2000,  # Summarize when exceeding this
    return_messages=True
)

# Add conversation turns
memory.save_context(
    {"input": "What's session compression?"},
    {"output": "Session compression reduces conversation token usage..."}
)

# Retrieve compressed context
context = memory.load_memory_variables({})

Progressive Compression Pattern

from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

class ProgressiveCompressor:
    def __init__(self, thresholds=[0.70, 0.85, 0.95]):
        self.thresholds = thresholds
        self.messages = []
        self.max_tokens = 200000  # Claude context window

    def add_message(self, role: str, content: str):
        self.messages.append({"role": role, "content": content})

        # Check if compression needed
        current_usage = self._estimate_tokens()
        usage_ratio = current_usage / self.max_tokens

        if usage_ratio >= self.thresholds[0]:
            self._compress(level=self._get_compression_level(usage_ratio))

    def _estimate_tokens(self):
        return sum(len(m["content"]) // 4 for m in self.messages)

    def _get_compression_level(self, ratio):
        for i, threshold in enumerate(self.thresholds):
            if ratio < threshold:
                return i
        return len(self.thresholds)

    def _compress(self, level: int):
        """Apply compression based on severity level."""
        if level == 1:  # 70% threshold: Light compression
            self._remove_redundant_messages()
        elif level == 2:  # 85% threshold: Medium compression
            self._summarize_old_messages(keep_recent=10)
        else:  # 95% threshold: Aggressive compression
            self._summarize_old_messages(keep_recent=5)

    def _remove_redundant_messages(self):
        """Remove duplicate or low-value messages."""
        # Implementation: Use semantic deduplication
        pass

    def _summarize_old_messages(self, keep_recent: int):
        """Summarize older messages, keep recent ones verbatim."""
        if len(self.messages) <= keep_recent:
            return

        # Messages to summarize
        to_summarize = self.messages[:-keep_recent]
        recent = self.messages[-keep_recent:]

        # Generate summary
        conversation_text = "\n\n".join([
            f"{m['role'].upper()}: {m['content']}"
            for m in to_summarize
        ])

        response = client.messages.create(
            model="claude-3-5-haiku-20241022",
            max_tokens=500,
            messages=[{
                "role": "user",
                "content": f"Summarize this conversation:\n\n{conversation_text}"
            }]
        )

        # Replace old messages with summary
        summary = {
            "role": "system",
            "content": f"[Summary]\n{response.content[0].text}"
        }
        self.messages = [summary] + recent

# Usage
compressor = ProgressiveCompressor()

for i in range(100):
    compressor.add_message("user", f"Message {i}")
    compressor.add_message("assistant", f"Response {i}")

Using Anthropic Prompt Caching (90% Cost Reduction)

from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

# Build context with cache control
messages = [
    {
        "role": "user",
        "content": [
            {
                "type": "text",
                "text": "Long conversation context here...",
                "cache_control": {"type": "ephemeral"}  # Cache this
            }
        ]
    },
    {
        "role": "assistant",
        "content": "Previous response..."
    },
    {
        "role": "user",
        "content": "New question"  # Not cached, changes frequently
    }
]

response = client.messages.create(
    model="claude-3-5-sonnet-20241022",
    max_tokens=1024,
    messages=messages
)

# Cache hit reduces costs by 90% for cached content

Core Concepts

Context Windows and Token Limits

Context window: Maximum tokens an LLM can process in a single request (input + output).

Current limits (2025):

  • Claude 3.5 Sonnet: 200K tokens (~150K words, ~600 pages)
  • GPT-4 Turbo: 128K tokens (~96K words, ~384 pages)
  • Gemini 1.5 Pro: 2M tokens (~1.5M words, ~6000 pages)

Token estimation:

  • English: ~4 characters per token
  • Code: ~3 characters per token
  • Rule of thumb: 1 token ≈ 0.75 words

Why compression matters:

  • Cost: Claude Sonnet costs $3/$15 per 1M input/output tokens
  • Latency: Larger contexts increase processing time
  • Quality: Excessive context can dilute attention on relevant information

Compression Ratios

Compression ratio = Original tokens / Compressed tokens

Industry benchmarks:

  • Extractive summarization: 2-3x
  • Abstractive summarization: 5-10x
  • Hierarchical summarization: 20x+
  • LLMLingua (prompt compression): 20x with 1.5% accuracy loss
  • KVzip (KV cache compression): 3-4x with 2x speed improvement

Target ratios by use case:

  • Customer support: 5-7x (preserve details)
  • General chat: 8-12x (balance quality/efficiency)
  • Code assistants: 3-5x (preserve technical accuracy)
  • Long documents: 15-20x (extract key insights)

Progressive Compression Thresholds

Industry standard pattern:

Context Usage    Action                     Technique
─────────────────────────────────────────────────────────
0-70%           No compression             Store verbatim
70-85%          Light compression          Remove redundancy
85-95%          Medium compression         Summarize old messages
95-100%         Aggressive compression     Hierarchical + RAG

Implementation guidelines:

  • 70% threshold: Remove duplicate/redundant messages, semantic deduplication
  • 85% threshold: Summarize messages older than 20 turns, keep recent 10-15
  • 95% threshold: Multi-level hierarchical summarization + vector store archival
  • Emergency (100%): Drop least important messages, aggressive summarization

Compression Techniques

1. Summarization Techniques

1.1 Extractive Summarization

Selects key sentences/phrases without modification.

Pros: No hallucination, fast, deterministic Cons: Limited compression (2-3x), may feel disjointed Best for: Legal/compliance, short-term compression

from sklearn.feature_extraction.text import TfidfVectorizer
import numpy as np

def extractive_compress(messages: list, compression_ratio: float = 0.3):
    """Extract most important messages using TF-IDF scoring."""
    texts = [msg['content'] for msg in messages]

    # Calculate TF-IDF scores
    vectorizer = TfidfVectorizer()
    tfidf_matrix = vectorizer.fit_transform(texts)
    scores = np.array(tfidf_matrix.sum(axis=1)).flatten()

    # Select top messages
    n_keep = max(1, int(len(messages) * compression_ratio))
    top_indices = sorted(np.argsort(scores)[-n_keep:])

    return [messages[i] for i in top_indices]

1.2 Abstractive Summarization

Uses LLMs to semantically condense conversation history.

Pros: Higher compression (5-10x), coherent, synthesizes information Cons: Risk of hallucination, higher cost, less deterministic Best for: General chat, customer support, multi-session continuity

from anthropic import Anthropic

def abstractive_compress(messages: list, client: Anthropic):
    """Generate semantic summary using Claude."""
    conversation_text = "\n\n".join([
        f"{msg['role'].upper()}: {msg['content']}"
        for msg in messages
    ])

    response = client.messages.create(
        model="claude-3-5-sonnet-20241022",
        max_tokens=500,
        messages=[{
            "role": "user",
            "content": f"""Summarize this conversation, preserving:
1. Key decisions made
2. Important context and facts
3. Unresolved questions
4. Action items

Conversation:
{conversation_text}

Summary (aim for 1/5 the original length):"""
        }]
    )

    return {
        "role": "assistant",
        "content": f"[Summary]\n{response.content[0].text}"
    }

1.3 Hierarchical Summarization (Multi-Level)

Creates summaries of summaries in a tree structure.

Pros: Extreme compression (20x+), handles 1M+ token conversations Cons: Complex implementation, multiple LLM calls, information loss accumulates Best for: Long-running conversations, multi-session applications

Architecture:

Level 0 (Raw):    [Msg1][Msg2][Msg3][Msg4][Msg5][Msg6][Msg7][Msg8]
Level 1 (Chunk):  [Summary1-2]  [Summary3-4]  [Summary5-6]  [Summary7-8]
Level 2 (Group):  [Summary1-4]              [Summary5-8]
Level 3 (Session): [Overall Session Summary]
from anthropic import Anthropic
from typing import List, Dict

class HierarchicalMemory:
    def __init__(self, client: Anthropic, chunk_size: int = 10):
        self.client = client
        self.chunk_size = chunk_size
        self.levels: List[List[Dict]] = [[]]  # Level 0 = raw messages

    def add_message(self, message: Dict):
        """Add message and trigger summarization if needed."""
        self.levels[0].append(message)

        if len(self.levels[0]) >= self.chunk_size * 2:
            self._summarize_level(0)

    def _summarize_level(self, level: int):
        """Summarize a level into the next higher level."""
        messages = self.levels[level]

        # Ensure next level exists
        while len(self.levels) <= level + 1:
            self.levels.append([])

        # Summarize first chunk
        chunk = messages[:self.chunk_size]
        summary = self._generate_summary(chunk, level)

        # Move to next level
        self.levels[level + 1].append(summary)
        self.levels[level] = messages[self.chunk_size:]

        # Recursively check if next level needs summarization
        if len(self.levels[level + 1]) >= self.chunk_size * 2:
            self._summarize_level(level + 1)

    def _generate_summary(self, messages: List[Dict], level: int) -> Dict:
        """Generate summary for a chunk."""
        conversation_text = "\n\n".join([
            f"{msg['role'].upper()}: {msg['content']}"
            for msg in messages
        ])

        response = self.client.messages.create(
            model="claude-3-5-haiku-20241022",
            max_tokens=300,
            messages=[{
                "role": "user",
                "content": f"Summarize this Level {level} conversation chunk:\n\n{conversation_text}"
            }]
        )

        return {
            "role": "system",
            "content": f"[L{level+1} Summary] {response.content[0].text}",
            "level": level + 1
        }

    def get_context(self, max_tokens: int = 4000) -> List[Dict]:
        """Retrieve context within token budget."""
        context = []
        token_count = 0

        # Prioritize recent raw messages
        for msg in reversed(self.levels[0]):
            msg_tokens = len(msg['content']) // 4
            if token_count + msg_tokens > max_tokens * 0.6:
                break
            context.insert(0, msg)
            token_count += msg_tokens

        # Add summaries from higher levels
        for level in range(1, len(self.levels)):
            for summary in self.levels[level]:
                summary_tokens = len(summary['content']) // 4
                if token_count + summary_tokens > max_tokens:
                    break
                context.insert(0, summary)
                token_count += summary_tokens

        return context

Academic reference: "Recursively Summarizing Enables Long-Term Dialogue Memory in Large Language Models" (arXiv:2308.15022)

1.4 Rolling Summarization (Continuous)

Continuously compresses conversation with sliding window.

Pros: Low latency, predictable token usage, simple Cons: Early details over-compressed, no information recovery Best for: Real-time chat, streaming conversations

from anthropic import Anthropic

class RollingMemory:
    def __init__(self, client: Anthropic, window_size: int = 10, compress_threshold: int = 15):
        self.client = client
        self.window_size = window_size
        self.compress_threshold = compress_threshold
        self.rolling_summary = None
        self.recent_messages = []

    def add_message(self, message: dict):
        self.recent_messages.append(message)

        if len(self.recent_messages) >= self.compress_threshold:
            self._compress()

    def _compress(self):
        """Compress older messages into rolling summary."""
        messages_to_compress = self.recent_messages[:-self.window_size]

        parts = []
        if self.rolling_summary:
            parts.append(f"Existing summary:\n{self.rolling_summary}")

        parts.append("\nNew messages:\n" + "\n\n".join([
            f"{msg['role']}: {msg['content']}"
            for msg in messages_to_compress
        ]))

        response = self.client.messages.create(
            model="claude-3-5-haiku-20241022",
            max_tokens=400,
            messages=[{
                "role": "user",
                "content": "\n".join(parts) + "\n\nUpdate the summary:"
            }]
        )

        self.rolling_summary = response.content[0].text
        self.recent_messages = self.recent_messages[-self.window_size:]

    def get_context(self):
        context = []
        if self.rolling_summary:
            context.append({
                "role": "system",
                "content": f"[Summary]\n{self.rolling_summary}"
            })
        context.extend(self.recent_messages)
        return context

2. Embedding-Based Approaches

2.1 RAG (Retrieval-Augmented Generation)

Store full conversation in vector database, retrieve only relevant chunks.

Pros: Extremely scalable, no information loss, high relevance Cons: Requires vector DB infrastructure, retrieval latency Best for: Knowledge bases, customer support with large history

from anthropic import Anthropic
from openai import OpenAI
import chromadb

class RAGMemory:
    def __init__(self, anthropic_client: Anthropic, openai_client: OpenAI):
        self.anthropic = anthropic_client
        self.openai = openai_client

        # Initialize vector store
        self.chroma = chromadb.Client()
        self.collection = self.chroma.create_collection(
            name="conversation",
            metadata={"hnsw:space": "cosine"}
        )

        self.recent_messages = []
        self.recent_window = 5
        self.message_counter = 0

    def add_message(self, message: dict):
        """Add to recent memory and vector store."""
        self.recent_messages.append(message)

        if len(self.recent_messages) > self.recent_window:
            old_msg = self.recent_messages.pop(0)
            self._store_in_vectordb(old_msg)

    def _store_in_vectordb(self, message: dict):
        """Archive to vector database."""
        # Generate embedding
        response = self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=message['content']
        )

        self.collection.add(
            embeddings=[response.data[0].embedding],
            documents=[message['content']],
            metadatas=[{"role": message['role']}],
            ids=[f"msg_{self.message_counter}"]
        )
        self.message_counter += 1

    def retrieve_context(self, query: str, max_tokens: int = 4000):
        """Retrieve relevant context using RAG."""
        context = []
        token_count = 0

        # 1. Recent messages (short-term memory)
        for msg in self.recent_messages:
            context.append(msg)
            token_count += len(msg['content']) // 4

        # 2. Retrieve relevant historical context
        if token_count < max_tokens:
            query_embedding = self.openai.embeddings.create(
                model="text-embedding-3-small",
                input=query
            )

            n_results = min(10, (max_tokens - token_count) // 100)
            results = self.collection.query(
                query_embeddings=[query_embedding.data[0].embedding],
                n_results=n_results
            )

            for i, doc in enumerate(results['documents'][0]):
                if token_count + len(doc) // 4 > max_tokens:
                    break

                metadata = results['metadatas'][0][i]
                context.insert(0, {
                    "role": metadata['role'],
                    "content": f"[Retrieved] {doc}"
                })
                token_count += len(doc) // 4

        return context

Vector database options:

  • ChromaDB: Embedded, easy local development
  • Pinecone: Managed, 50ms p95 latency
  • Weaviate: Open-source, hybrid search
  • Qdrant: High performance, payload filtering

2.2 Vector Search and Clustering

Group similar messages into clusters, represent with centroids.

Pros: Reduces redundancy, identifies themes, multi-topic handling Cons: Requires sufficient data, may lose nuances Best for: Multi-topic conversations, meeting summaries

from sklearn.cluster import KMeans
from openai import OpenAI
import numpy as np

class ClusteredMemory:
    def __init__(self, openai_client: OpenAI, n_clusters: int = 5):
        self.client = openai_client
        self.n_clusters = n_clusters
        self.messages = []
        self.embeddings = []

    def add_messages(self, messages: list):
        for msg in messages:
            self.messages.append(msg)

            response = self.client.embeddings.create(
                model="text-embedding-3-small",
                input=msg['content']
            )
            self.embeddings.append(response.data[0].embedding)

    def compress_by_clustering(self):
        """Cluster messages and return representatives."""
        if len(self.messages) < self.n_clusters:
            return self.messages

        embeddings_array = np.array(self.embeddings)
        kmeans = KMeans(n_clusters=self.n_clusters, random_state=42)
        labels = kmeans.fit_predict(embeddings_array)

        # Select message closest to each centroid
        compressed = []
        for cluster_id in range(self.n_clusters):
            cluster_indices = np.where(labels == cluster_id)[0]
            centroid = kmeans.cluster_centers_[cluster_id]
            cluster_embeddings = embeddings_array[cluster_indices]
            distances = np.linalg.norm(cluster_embeddings - centroid, axis=1)
            closest_idx = cluster_indices[np.argmin(distances)]

            compressed.append({
                **self.messages[closest_idx],
                "cluster_id": int(cluster_id),
                "cluster_size": len(cluster_indices)
            })

        return compressed

2.3 Semantic Deduplication

Remove semantically similar messages that convey redundant information.

Pros: Reduces redundancy without losing unique content Cons: Requires threshold tuning, O(n²) complexity Best for: FAQ systems, repetitive conversations

from openai import OpenAI
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

class SemanticDeduplicator:
    def __init__(self, openai_client: OpenAI, similarity_threshold: float = 0.85):
        self.client = openai_client
        self.threshold = similarity_threshold

    def deduplicate(self, messages: list):
        """Remove semantically similar messages."""
        if len(messages) <= 1:
            return messages

        # Generate embeddings
        embeddings = []
        for msg in messages:
            response = self.client.embeddings.create(
                model="text-embedding-3-small",
                input=msg['content']
            )
            embeddings.append(response.data[0].embedding)

        embeddings_array = np.array(embeddings)
        similarity_matrix = cosine_similarity(embeddings_array)

        # Mark unique messages
        keep_indices = []
        for i in range(len(messages)):
            is_unique = True
            for j in keep_indices:
                if similarity_matrix[i][j] > self.threshold:
                    is_unique = False
                    break

            if is_unique:
                keep_indices.append(i)

        return [messages[i] for i in keep_indices]

3. Token-Efficient Strategies

3.1 Message Prioritization

Assign importance scores and retain only high-priority content.

Pros: Retains most important information, flexible criteria Cons: Scoring is heuristic-based, may break flow Best for: Mixed-importance conversations, filtering noise

import re

class MessagePrioritizer:
    def score_message(self, msg: dict, index: int, total: int) -> float:
        """Calculate composite importance score."""
        scores = []

        # Length score (longer = more info)
        scores.append(min(len(msg['content']) / 500, 1.0))

        # Question score
        if msg['role'] == 'user':
            scores.append(min(msg['content'].count('?') * 0.5, 1.0))

        # Entity score (capitalized words)
        entities = len(re.findall(r'\b[A-Z][a-z]+', msg['content']))
        scores.append(min(entities / 10, 1.0))

        # Recency score (linear decay)
        scores.append(index / max(total - 1, 1))

        # Role score
        scores.append(0.6 if msg['role'] == 'user' else 0.4)

        return sum(scores) / len(scores)

    def prioritize(self, messages: list, target_count: int):
        """Select top N messages by priority."""
        scored = [
            (msg, self.score_message(msg, i, len(messages)), i)
            for i, msg in enumerate(messages)
        ]

        scored.sort(key=lambda x: x[1], reverse=True)
        top_messages = scored[:target_count]
        top_messages.sort(key=lambda x: x[2])  # Restore chronological order

        return [msg for msg, score, idx in top_messages]

3.2 Delta Compression

Store only changes between consecutive messages.

Pros: Highly efficient for incremental changes Cons: Reconstruction overhead, not suitable for all content Best for: Code assistants with incremental edits

import difflib

class DeltaCompressor:
    def __init__(self):
        self.base_messages = []
        self.deltas = []

    def add_message(self, message: dict):
        if not self.base_messages:
            self.base_messages.append(message)
            return

        # Find most similar previous message
        last_msg = self.base_messages[-1]

        if last_msg['role'] == message['role']:
            # Calculate delta
            diff = list(difflib.unified_diff(
                last_msg['content'].splitlines(),
                message['content'].splitlines(),
                lineterm=''
            ))

            if len('\n'.join(diff)) < len(message['content']) * 0.7:
                # Store as delta if compression achieved
                self.deltas.append({
                    'base_index': len(self.base_messages) - 1,
                    'delta': diff,
                    'role': message['role']
                })
                return

        # Store as new base message
        self.base_messages.append(message)

    def reconstruct(self):
        """Reconstruct full conversation from bases + deltas."""
        messages = self.base_messages.copy()

        for delta_info in self.deltas:
            base_content = messages[delta_info['base_index']]['content']
            # Apply diff to reconstruct (simplified)
            reconstructed = base_content  # Full implementation would apply diff
            messages.append({
                'role': delta_info['role'],
                'content': reconstructed
            })

        return messages

4. LangChain Memory Types

4.1 ConversationSummaryMemory

Automatically summarizes conversation as it progresses.

from langchain.memory import ConversationSummaryMemory
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")

memory = ConversationSummaryMemory(llm=llm)

# Add conversation
memory.save_context(
    {"input": "Hi, I'm working on a Python project"},
    {"output": "Great! How can I help with your Python project?"}
)

# Get summary
summary = memory.load_memory_variables({})
print(summary['history'])

Pros: Automatic summarization, simple API Cons: Every turn triggers LLM call Best for: Medium conversations (20-50 turns)

4.2 ConversationSummaryBufferMemory

Hybrid: Recent messages verbatim, older summarized.

from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-haiku-20241022")

memory = ConversationSummaryBufferMemory(
    llm=llm,
    max_token_limit=2000,  # Summarize when exceeding
    return_messages=True
)

# Add conversation
for i in range(50):
    memory.save_context(
        {"input": f"Question {i}"},
        {"output": f"Answer {i}"}
    )

# Automatically keeps recent messages + summary of old
context = memory.load_memory_variables({})

Pros: Best balance of detail and compression Cons: Requires token limit tuning Best for: Most production applications

4.3 ConversationTokenBufferMemory

Maintains fixed token budget, drops oldest when exceeded.

from langchain.memory import ConversationTokenBufferMemory
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")

memory = ConversationTokenBufferMemory(
    llm=llm,
    max_token_limit=2000
)

# Simple FIFO when token limit exceeded

Pros: Predictable token usage, simple Cons: Loses old information completely Best for: Real-time chat with strict limits

4.4 VectorStoreRetrieverMemory

Stores all messages in vector database, retrieves relevant ones.

from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

embeddings = OpenAIEmbeddings()
vectorstore = Chroma(embedding_function=embeddings)

memory = VectorStoreRetrieverMemory(
    retriever=vectorstore.as_retriever(search_kwargs={"k": 5})
)

# Automatically retrieves most relevant context

Pros: Infinite conversation length, semantic retrieval Cons: Requires vector DB, retrieval overhead Best for: Long-running conversations, knowledge bases

5. Anthropic-Specific Patterns

5.1 Prompt Caching (90% Cost Reduction)

Cache static context to reduce token costs.

from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

# Long conversation context
conversation_history = [
    {"role": "user", "content": "Message 1"},
    {"role": "assistant", "content": "Response 1"},
    # ... many more messages
]

# Mark context for caching
messages = []
for i, msg in enumerate(conversation_history[:-1]):
    content = msg['content']

    # Add cache control to last context message
    if i == len(conversation_history) - 2:
        messages.append({
            "role": msg['role'],
            "content": [
                {
                    "type": "text",
                    "text": content,
                    "cache_control": {"type": "ephemeral"}
                }
            ]
        })
    else:
        messages.append(msg)

# Add new user message (not cached)
messages.append(conversation_history[-1])

response = client.messages.create(
    model="claude-3-5-sonnet-20241022",
    max_tokens=1024,
    messages=messages
)

# Subsequent calls with same cached context cost 90% less

Cache TTL: 5 minutes Savings: 90% cost reduction for cached tokens Limits: Max 4 cache breakpoints per request Best practices:

  • Cache conversation history, not current query
  • Update cache when context changes significantly
  • Combine with summarization for maximum efficiency

5.2 Extended Thinking for Compression Planning

Use extended thinking to plan optimal compression strategy.

from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

response = client.messages.create(
    model="claude-3-7-sonnet-20250219",
    max_tokens=16000,
    thinking={
        "type": "enabled",
        "budget_tokens": 10000
    },
    messages=[{
        "role": "user",
        "content": f"""Analyze this conversation and recommend compression:

{conversation_text}

Current token count: {current_tokens}
Target: {target_tokens}
Required compression: {compression_ratio}x

Recommend optimal strategy."""
    }]
)

# Access thinking process
thinking_content = [
    block for block in response.content
    if block.type == "thinking"
]

# Get compression recommendation
recommendation = response.content[-1].text

Production Patterns

Checkpointing and Persistence

Save compression state for recovery and resume.

import json
import pickle
from pathlib import Path

class PersistentMemory:
    def __init__(self, checkpoint_dir: str = "./checkpoints"):
        self.checkpoint_dir = Path(checkpoint_dir)
        self.checkpoint_dir.mkdir(exist_ok=True)
        self.memory = []
        self.summary = None

    def save_checkpoint(self, session_id: str):
        """Save current memory state."""
        checkpoint = {
            'messages': self.memory,
            'summary': self.summary,
            'timestamp': time.time()
        }

        checkpoint_file = self.checkpoint_dir / f"{session_id}.json"
        with open(checkpoint_file, 'w') as f:
            json.dump(checkpoint, f, indent=2)

    def load_checkpoint(self, session_id: str):
        """Load memory state from checkpoint."""
        checkpoint_file = self.checkpoint_dir / f"{session_id}.json"

        if checkpoint_file.exists():
            with open(checkpoint_file, 'r') as f:
                checkpoint = json.load(f)

            self.memory = checkpoint['messages']
            self.summary = checkpoint.get('summary')
            return True

        return False

    def auto_checkpoint(self, session_id: str, interval: int = 10):
        """Automatically save every N messages."""
        if len(self.memory) % interval == 0:
            self.save_checkpoint(session_id)

Resume Workflows

Continue conversations across sessions.

from anthropic import Anthropic
import json

class ResumableConversation:
    def __init__(self, client: Anthropic, session_id: str):
        self.client = client
        self.session_id = session_id
        self.memory = self._load_or_create()

    def _load_or_create(self):
        """Load existing session or create new."""
        try:
            with open(f'sessions/{self.session_id}.json', 'r') as f:
                return json.load(f)
        except FileNotFoundError:
            return {
                'messages': [],
                'summary': None,
                'created_at': time.time()
            }

    def add_turn(self, user_message: str):
        """Add user message and get response."""
        # Add user message
        self.memory['messages'].append({
            'role': 'user',
            'content': user_message
        })

        # Build context (with compression)
        context = self._build_context()

        # Get response
        response = self.client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            messages=context + [{
                'role': 'user',
                'content': user_message
            }]
        )

        # Save response
        assistant_message = response.content[0].text
        self.memory['messages'].append({
            'role': 'assistant',
            'content': assistant_message
        })

        # Compress if needed
        if len(self.memory['messages']) > 20:
            self._compress()

        # Save state
        self._save()

        return assistant_message

    def _build_context(self):
        """Build context with compression."""
        context = []

        # Add summary if exists
        if self.memory['summary']:
            context.append({
                'role': 'system',
                'content': f"[Previous conversation summary]\n{self.memory['summary']}"
            })

        # Add recent messages
        context.extend(self.memory['messages'][-10:])

        return context

    def _compress(self):
        """Compress older messages."""
        if len(self.memory['messages']) < 15:
            return

        # Messages to summarize
        to_summarize = self.memory['messages'][:-10]

        # Generate summary
        conversation_text = "\n\n".join([
            f"{msg['role']}: {msg['content']}"
            for msg in to_summarize
        ])

        response = self.client.messages.create(
            model="claude-3-5-haiku-20241022",
            max_tokens=500,
            messages=[{
                'role': 'user',
                'content': f"Summarize this conversation:\n\n{conversation_text}"
            }]
        )

        # Update memory
        self.memory['summary'] = response.content[0].text
        self.memory['messages'] = self.memory['messages'][-10:]

    def _save(self):
        """Save session to disk."""
        with open(f'sessions/{self.session_id}.json', 'w') as f:
            json.dump(self.memory, f, indent=2)

# Usage
client = Anthropic(api_key="your-api-key")
conversation = ResumableConversation(client, session_id="user123_session1")

# Continue across multiple sessions
response1 = conversation.add_turn("What's Python?")
# ... later session
response2 = conversation.add_turn("Show me an example")  # Remembers context

Hybrid Approaches (Best Practice)

Combine multiple techniques for optimal results.

from anthropic import Anthropic
from openai import OpenAI
import chromadb

class HybridMemorySystem:
    """
    Combines:
    - Rolling summarization (short-term compression)
    - RAG retrieval (long-term memory)
    - Prompt caching (cost optimization)
    - Progressive compression (adaptive behavior)
    """

    def __init__(self, anthropic_client: Anthropic, openai_client: OpenAI):
        self.anthropic = anthropic_client
        self.openai = openai_client

        # Recent messages (verbatim)
        self.recent_messages = []
        self.recent_window = 10

        # Rolling summary
        self.rolling_summary = None

        # Vector store (long-term)
        self.chroma = chromadb.Client()
        self.collection = self.chroma.create_collection(name="memory")
        self.message_counter = 0

        # Compression thresholds
        self.thresholds = {
            'light': 0.70,    # Start basic compression
            'medium': 0.85,   # Aggressive summarization
            'heavy': 0.95     # Emergency measures
        }

    def add_message(self, message: dict):
        """Add message with intelligent compression."""
        self.recent_messages.append(message)

        # Check compression needs
        usage_ratio = self._estimate_usage()

        if usage_ratio >= self.thresholds['heavy']:
            self._emergency_compress()
        elif usage_ratio >= self.thresholds['medium']:
            self._medium_compress()
        elif usage_ratio >= self.thresholds['light']:
            self._light_compress()

    def _light_compress(self):
        """Remove redundancy, archive to vector store."""
        if len(self.recent_messages) > self.recent_window * 1.5:
            # Archive oldest to vector store
            to_archive = self.recent_messages[:5]
            for msg in to_archive:
                self._archive_to_vectorstore(msg)

            self.recent_messages = self.recent_messages[5:]

    def _medium_compress(self):
        """Generate rolling summary, aggressive archival."""
        if len(self.recent_messages) > self.recent_window:
            # Summarize older messages
            to_summarize = self.recent_messages[:-self.recent_window]

            summary_text = "\n\n".join([
                f"{msg['role']}: {msg['content']}"
                for msg in to_summarize
            ])

            if self.rolling_summary:
                summary_text = f"Existing: {self.rolling_summary}\n\nNew: {summary_text}"

            response = self.anthropic.messages.create(
                model="claude-3-5-haiku-20241022",
                max_tokens=400,
                messages=[{
                    'role': 'user',
                    'content': f"Update summary:\n{summary_text}"
                }]
            )

            self.rolling_summary = response.content[0].text

            # Archive all summarized messages
            for msg in to_summarize:
                self._archive_to_vectorstore(msg)

            self.recent_messages = self.recent_messages[-self.recent_window:]

    def _emergency_compress(self):
        """Extreme compression for near-limit situations."""
        # Keep only 5 most recent messages
        to_archive = self.recent_messages[:-5]
        for msg in to_archive:
            self._archive_to_vectorstore(msg)

        self.recent_messages = self.recent_messages[-5:]

        # Compress summary further if needed
        if self.rolling_summary and len(self.rolling_summary) > 1000:
            response = self.anthropic.messages.create(
                model="claude-3-5-haiku-20241022",
                max_tokens=200,
                messages=[{
                    'role': 'user',
                    'content': f"Create ultra-concise summary:\n{self.rolling_summary}"
                }]
            )
            self.rolling_summary = response.content[0].text

    def _archive_to_vectorstore(self, message: dict):
        """Store in vector database for retrieval."""
        embedding_response = self.openai.embeddings.create(
            model="text-embedding-3-small",
            input=message['content']
        )

        self.collection.add(
            embeddings=[embedding_response.data[0].embedding],
            documents=[message['content']],
            metadatas=[{'role': message['role']}],
            ids=[f"msg_{self.message_counter}"]
        )
        self.message_counter += 1

    def get_context(self, current_query: str, max_tokens: int = 8000):
        """Build optimal context for current query."""
        context = []
        token_count = 0

        # 1. Add rolling summary (if exists)
        if self.rolling_summary:
            summary_msg = {
                'role': 'system',
                'content': [
                    {
                        'type': 'text',
                        'text': f"[Conversation Summary]\n{self.rolling_summary}",
                        'cache_control': {'type': 'ephemeral'}  # Cache it
                    }
                ]
            }
            context.append(summary_msg)
            token_count += len(self.rolling_summary) // 4

        # 2. Retrieve relevant historical context (RAG)
        if token_count < max_tokens * 0.3:
            query_embedding = self.openai.embeddings.create(
                model="text-embedding-3-small",
                input=current_query
            )

            results = self.collection.query(
                query_embeddings=[query_embedding.data[0].embedding],
                n_results=5
            )

            for i, doc in enumerate(results['documents'][0]):
                if token_count + len(doc) // 4 > max_tokens * 0.3:
                    break

                metadata = results['metadatas'][0][i]
                context.append({
                    'role': metadata['role'],
                    'content': f"[Retrieved] {doc}"
                })
                token_count += len(doc) // 4

        # 3. Add recent messages verbatim
        for msg in self.recent_messages:
            if token_count + len(msg['content']) // 4 > max_tokens * 0.8:
                break
            context.append(msg)
            token_count += len(msg['content']) // 4

        return context

    def _estimate_usage(self):
        """Estimate current context window usage."""
        total_tokens = 0

        if self.rolling_summary:
            total_tokens += len(self.rolling_summary) // 4

        for msg in self.recent_messages:
            total_tokens += len(msg['content']) // 4

        return total_tokens / 200000  # Claude Sonnet context window

# Usage
anthropic_client = Anthropic(api_key="your-anthropic-key")
openai_client = OpenAI(api_key="your-openai-key")

memory = HybridMemorySystem(anthropic_client, openai_client)

# Add messages over time
for i in range(1000):
    memory.add_message({
        'role': 'user' if i % 2 == 0 else 'assistant',
        'content': f"Message {i} with some content..."
    })

# Retrieve optimized context
current_query = "What did we discuss about pricing?"
context = memory.get_context(current_query)

# Use with Claude
response = anthropic_client.messages.create(
    model="claude-3-5-sonnet-20241022",
    max_tokens=1024,
    messages=context + [{
        'role': 'user',
        'content': current_query
    }]
)

Performance Benchmarks

Compression Efficiency

Technique Compression Ratio Quality Loss Latency Cost Impact
Extractive 2-3x <1% <10ms None
Abstractive 5-10x 2-5% 1-2s +$0.001/turn
Hierarchical 20x+ 5-8% 2-5s +$0.003/turn
LLMLingua 20x 1.5% 500ms None
RAG Variable <1% 100-300ms +$0.0005/turn
Prompt Caching N/A 0% 0ms -90%

Token Savings by Use Case

Customer Support (50-turn conversation):

  • No compression: ~8,000 tokens/request
  • Rolling summary: ~2,000 tokens/request (75% reduction)
  • Hybrid (RAG + summary): ~1,500 tokens/request (81% reduction)

Code Assistant (100-turn session):

  • No compression: ~25,000 tokens/request
  • Hierarchical: ~5,000 tokens/request (80% reduction)
  • Hybrid + caching: ~1,000 tokens/request effective (96% cost reduction)

Educational Tutor (multi-session):

  • No compression: Would exceed context window
  • RAG + summarization: ~3,000 tokens/request
  • Infinite session length enabled

Cost Analysis

Example: Claude Sonnet pricing ($3 input, $15 output per 1M tokens)

1,000 conversations, 50 turns each:

  • No compression:

    • Avg 8K tokens/request × 50K requests = 400M tokens
    • Cost: $1,200
  • With rolling summarization:

    • Avg 2K tokens/request × 50K requests = 100M tokens
    • Summarization overhead: +10M tokens
    • Cost: $330 (72% savings)
  • With hybrid system + caching:

    • First turn: 2K tokens (no cache)
    • Subsequent: 200 tokens effective (90% cache hit)
    • Total: ~15M tokens effective
    • Cost: $45 (96% savings)

Tool Recommendations

Memory Management Tools

Mem0 (Recommended for Production)

Best for: Hybrid memory systems with minimal code

from mem0 import MemoryClient

client = MemoryClient(api_key="your-mem0-key")

# Automatically handles compression, summarization, RAG
memory = client.create_memory(
    user_id="user123",
    messages=[
        {"role": "user", "content": "I'm working on a Python project"},
        {"role": "assistant", "content": "Great! What kind of project?"}
    ]
)

# Retrieve relevant context
context = client.get_memory(
    user_id="user123",
    query="What programming language am I using?"
)

Features:

  • Automatic hierarchical summarization
  • Built-in RAG retrieval
  • Multi-user session management
  • Analytics dashboard

Pricing: $0.40/1K memory operations

Zep

Best for: Low-latency production deployments**

from zep_python import ZepClient

client = ZepClient(api_key="your-zep-key")

# Add to session
client.memory.add_memory(
    session_id="session123",
    messages=[
        {"role": "user", "content": "Hello"},
        {"role": "assistant", "content": "Hi there!"}
    ]
)

# Auto-summarized retrieval
memory = client.memory.get_memory(session_id="session123")

Features:

  • <100ms retrieval latency
  • Automatic fact extraction
  • Entity recognition
  • Session management

Pricing: Open-source (self-hosted) or $0.50/1K operations (cloud)

ChromaDB

Best for: Self-hosted vector storage**

import chromadb

client = chromadb.Client()
collection = client.create_collection("conversations")

# Store embeddings
collection.add(
    documents=["Message content"],
    embeddings=[[0.1, 0.2, ...]],
    ids=["msg1"]
)

# Retrieve
results = collection.query(
    query_embeddings=[[0.1, 0.2, ...]],
    n_results=5
)

Features:

  • Fully open-source
  • Embedded or client-server
  • Fast local development

Pricing: Free (self-hosted)

LangChain

Best for: Rapid prototyping and experimentation**

from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-sonnet-20241022")
memory = ConversationSummaryBufferMemory(llm=llm, max_token_limit=2000)

Features:

  • Multiple memory types
  • Framework integration
  • Extensive documentation

Pricing: Free (uses your LLM API costs)

Compression Libraries

LLMLingua

Best for: Extreme compression with minimal quality loss**

from llmlingua import PromptCompressor

compressor = PromptCompressor()

compressed = compressor.compress_prompt(
    context="Long conversation history...",
    instruction="Current user query",
    target_token=500
)

# Achieves 20x compression with 1.5% accuracy loss

Features:

  • 20x compression ratios
  • <2% quality degradation
  • Fast inference (<500ms)

Pricing: Free (open-source)


Use Cases and Patterns

Chatbot (Customer Support)

Requirements:

  • Multi-turn conversations (50-100 turns)
  • Preserve customer context
  • Fast response times
  • Cost-efficient

Recommended approach:

  • ConversationSummaryBufferMemory (LangChain)
  • 70% threshold: Semantic deduplication
  • 85% threshold: Rolling summarization
  • Prompt caching for frequent patterns

Implementation:

from langchain.memory import ConversationSummaryBufferMemory
from langchain_anthropic import ChatAnthropic

llm = ChatAnthropic(model="claude-3-5-haiku-20241022")

memory = ConversationSummaryBufferMemory(
    llm=llm,
    max_token_limit=2000,
    return_messages=True
)

# Add customer conversation
for turn in customer_conversation:
    memory.save_context(
        {"input": turn['customer_message']},
        {"output": turn['agent_response']}
    )

# Retrieve compressed context
context = memory.load_memory_variables({})

Code Assistant

Requirements:

  • Long development sessions (100+ turns)
  • Preserve technical details
  • Handle large code blocks
  • Track incremental changes

Recommended approach:

  • Hierarchical summarization for overall context
  • RAG retrieval for specific code references
  • Delta compression for iterative edits
  • Prompt caching for system prompts

Implementation:

from anthropic import Anthropic

client = Anthropic(api_key="your-api-key")

class CodeAssistantMemory:
    def __init__(self):
        self.hierarchy = HierarchicalMemory(client, chunk_size=15)
        self.rag = RAGMemory(anthropic_client=client, openai_client=openai_client)
        self.deltas = DeltaCompressor()

    def add_interaction(self, code_change: dict):
        # Store in hierarchy
        self.hierarchy.add_message({
            'role': 'user',
            'content': code_change['description']
        })

        # Store in RAG for retrieval
        self.rag.add_message(code_change)

        # Store as delta if incremental
        if code_change.get('is_incremental'):
            self.deltas.add_message(code_change)

    def get_context(self, current_query: str):
        # Combine hierarchical summary + RAG retrieval
        summary_context = self.hierarchy.get_context(max_tokens=2000)
        rag_context = self.rag.retrieve_context(current_query, max_tokens=2000)

        return summary_context + rag_context

Educational Tutor

Requirements:

  • Multi-session tracking
  • Student progress persistence
  • Personalized context retrieval
  • Long-term knowledge retention

Recommended approach:

  • VectorStoreRetrieverMemory for multi-session
  • Fact extraction for student knowledge
  • Progressive compression across sessions
  • Resumable conversations

Implementation:

from langchain.memory import VectorStoreRetrieverMemory
from langchain_community.vectorstores import Chroma
from langchain_openai import OpenAIEmbeddings

class TutorMemory:
    def __init__(self, student_id: str):
        self.student_id = student_id

        # Vector store for all sessions
        embeddings = OpenAIEmbeddings()
        vectorstore = Chroma(
            collection_name=f"student_{student_id}",
            embedding_function=embeddings
        )

        self.memory = VectorStoreRetrieverMemory(
            retriever=vectorstore.as_retriever(search_kwargs={"k": 10})
        )

    def add_lesson_content(self, lesson: dict):
        """Add lesson interaction to student memory."""
        self.memory.save_context(
            {"input": lesson['topic']},
            {"output": lesson['explanation']}
        )

    def get_student_context(self, current_topic: str):
        """Retrieve relevant past lessons for current topic."""
        return self.memory.load_memory_variables({
            "prompt": current_topic
        })

Best Practices

1. Choose the Right Technique for Your Use Case

  • Short conversations (<20 turns): No compression needed
  • Medium conversations (20-50 turns): ConversationSummaryBufferMemory
  • Long conversations (50-100 turns): Hierarchical or rolling summarization
  • Very long (100+ turns): Hybrid (RAG + summarization + caching)
  • Multi-session: VectorStoreRetrieverMemory or Mem0

2. Implement Progressive Compression

Don't compress aggressively from the start. Use thresholds:

  • 0-70%: Store verbatim
  • 70-85%: Light compression (deduplication)
  • 85-95%: Medium compression (summarization)
  • 95-100%: Aggressive compression (hierarchical)

3. Combine Techniques

Single-technique approaches are suboptimal. Best production systems use:

  • Rolling summarization (short-term)
  • RAG retrieval (long-term)
  • Prompt caching (cost optimization)
  • Semantic deduplication (redundancy removal)

4. Monitor Quality Metrics

Track compression impact:

  • Response relevance score
  • Information retention rate
  • User satisfaction metrics
  • Token usage reduction

5. Use Prompt Caching Strategically

Cache stable content:

  • Conversation summaries
  • System prompts
  • Knowledge base context
  • User profiles

Don't cache frequently changing content:

  • Current user query
  • Real-time data
  • Session-specific state

6. Implement Checkpointing

Save compression state for:

  • Recovery from failures
  • Multi-session continuity
  • Analytics and debugging
  • A/B testing different strategies

7. Tune Compression Parameters

Test and optimize:

  • Summary token limits
  • Compression thresholds
  • Retrieval result counts
  • Cache TTLs
  • Chunk sizes for hierarchical

8. Handle Edge Cases

Plan for:

  • Very long messages (split or compress individually)
  • Code blocks (preserve formatting)
  • Multi-language content
  • Rapidly changing context

Troubleshooting

Problem: Summary loses critical information

Solutions:

  • Lower compression ratio (less aggressive)
  • Implement importance scoring to preserve key messages
  • Use extractive summarization for critical sections
  • Increase summary token budget

Problem: Retrieval returns irrelevant context

Solutions:

  • Improve embedding model quality
  • Add metadata filtering (timestamps, topics)
  • Adjust similarity threshold
  • Use hybrid search (semantic + keyword)

Problem: High latency from compression

Solutions:

  • Compress asynchronously (background tasks)
  • Use faster models for summarization (Haiku instead of Sonnet)
  • Cache summaries more aggressively
  • Reduce compression frequency

Problem: Conversations still exceeding context window

Solutions:

  • Implement hierarchical compression
  • Archive to vector database more aggressively
  • Use more aggressive compression ratios
  • Consider switching to model with larger context window

Problem: High costs despite compression

Solutions:

  • Implement prompt caching
  • Use cheaper models for summarization (Haiku)
  • Batch summarization operations
  • Reduce summarization frequency

Problem: Lost conversation continuity

Solutions:

  • Increase recent message window
  • Include summary in every request
  • Use more descriptive summaries
  • Implement session resumption with context injection

Advanced Topics

Streaming Compression

Compress in real-time as conversation progresses:

async def streaming_compress(messages: list):
    """Compress while streaming responses."""
    compressor = ProgressiveCompressor()

    async for message in conversation_stream:
        compressor.add_message(message)

        # Compression happens asynchronously
        if compressor.should_compress():
            asyncio.create_task(compressor.compress_async())

    return compressor.get_context()

Multi-User Session Management

Handle concurrent conversations with shared context:

class MultiUserMemory:
    def __init__(self):
        self.user_sessions = {}

    def get_or_create_session(self, user_id: str):
        if user_id not in self.user_sessions:
            self.user_sessions[user_id] = HybridMemorySystem(...)
        return self.user_sessions[user_id]

    def cleanup_inactive_sessions(self, timeout: int = 3600):
        """Remove sessions inactive for > timeout seconds."""
        current_time = time.time()
        inactive = [
            user_id for user_id, session in self.user_sessions.items()
            if current_time - session.last_activity > timeout
        ]

        for user_id in inactive:
            self._archive_session(user_id)
            del self.user_sessions[user_id]

Custom Importance Scoring

Train ML models to score message importance:

from transformers import pipeline

class MLImportanceScorer:
    def __init__(self):
        # Use pre-trained classifier or fine-tune on your data
        self.classifier = pipeline(
            "text-classification",
            model="your-importance-model"
        )

    def score(self, message: dict) -> float:
        """Score message importance (0-1)."""
        result = self.classifier(message['content'])
        return result[0]['score']

Context Window Utilization Optimization

Maximize information density within token budget:

def optimize_context_allocation(
    summary_tokens: int,
    recent_tokens: int,
    retrieval_tokens: int,
    max_tokens: int
):
    """
    Optimal allocation (empirically tested):
    - 20% summary
    - 50% recent messages
    - 30% retrieved context
    """
    return {
        'summary': int(max_tokens * 0.20),
        'recent': int(max_tokens * 0.50),
        'retrieval': int(max_tokens * 0.30)
    }

Future Directions

Emerging Techniques (2025+)

1. Infinite Attention Mechanisms

  • Models with >10M token context windows (Gemini 1.5, future Claude)
  • Reduces need for compression but doesn't eliminate cost concerns

2. Learned Compression Models

  • Neural networks trained to compress conversation optimally
  • Maintain semantic meaning while minimizing tokens
  • Examples: LLMLingua v2, PromptCompressor

3. Multimodal Session Compression

  • Compress conversations with images, audio, video
  • Maintain cross-modal context relationships

4. Federated Memory Systems

  • Distributed compression across multiple memory stores
  • Privacy-preserving compression for sensitive conversations

5. Adaptive Compression Strategies

  • RL-based systems that learn optimal compression per user/domain
  • Dynamic threshold adjustment based on conversation importance

References

Academic Papers

  • "Recursively Summarizing Enables Long-Term Dialogue Memory" (arXiv:2308.15022)
  • "LLMLingua: Compressing Prompts for Accelerated Inference" (arXiv:2310.05736)
  • "Lost in the Middle: How Language Models Use Long Contexts" (arXiv:2307.03172)

Documentation

Tools


Last Updated: 2025-11-30 Version: 1.0.0 License: MIT