| name | pulse-mcp-stream |
| description | Layer 1 Real-Time Social Stream Monitoring via MCP with DuckDB persistence |
pulse-mcp-stream
Layer 1: Real-Time Social Stream Monitoring via MCP
Version: 1.1.0 (music-topos enhanced) Trit: +1 (Generator - produces live data) Bundle: acquisition
Overview
Pulse-MCP-stream provides real-time monitoring of social interactions, enabling the cognitive surrogate system to stay updated with the latest patterns. It streams mentions, engagement changes, and trending topics.
Enhanced Integration: MCP + DuckDB
MCP Server (TypeScript)
// pulse-mcp-server/src/index.ts
import { Server } from "@modelcontextprotocol/sdk/server";
import { Firehose } from "@atproto/sync";
import * as duckdb from "duckdb";
const server = new Server({
name: "pulse-mcp-stream",
version: "1.0.0"
});
// Connect to DuckDB for persistence
const db = new duckdb.Database("pulse_stream.duckdb");
server.setRequestHandler("subscribe", async (params) => {
const { actor, filters } = params;
const firehose = new Firehose({
service: "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
});
firehose.on("create", async (event) => {
if (event.author === actor) {
// Store in DuckDB
await db.run(`
INSERT INTO pulse_events (event_id, event_type, actor_did, text, created_at)
VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
`, [event.uri, event.type, event.author, event.record?.text]);
}
});
await firehose.start();
return { status: "subscribed", actor };
});
DuckDB Schema
CREATE TABLE pulse_events (
event_id VARCHAR PRIMARY KEY,
event_type VARCHAR, -- 'post', 'reply', 'like', 'repost', 'mention'
actor_did VARCHAR,
actor_handle VARCHAR,
subject_uri VARCHAR,
text TEXT,
created_at TIMESTAMP,
ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
gay_color VARCHAR -- Deterministic color via SPI seed
);
CREATE TABLE engagement_deltas (
delta_id VARCHAR PRIMARY KEY,
post_uri VARCHAR,
likes_delta INT,
reposts_delta INT,
replies_delta INT,
velocity FLOAT, -- engagements per minute
measured_at TIMESTAMP
);
-- Real-time velocity tracking
CREATE VIEW v_post_velocity AS
SELECT
post_uri,
COUNT(*) FILTER (WHERE event_type = 'like') as likes,
COUNT(*) FILTER (WHERE event_type = 'repost') as reposts,
COUNT(*) / (EXTRACT(EPOCH FROM (MAX(created_at) - MIN(created_at))) / 60.0) as velocity_per_min
FROM pulse_events
WHERE created_at > NOW() - INTERVAL '1 hour'
GROUP BY post_uri;
Python Client
# pulse_client.py
import asyncio
import duckdb
from dataclasses import dataclass
from typing import AsyncIterator
@dataclass
class PulseEvent:
event_id: str
event_type: str
actor: str
text: str
created_at: str
class PulseClient:
def __init__(self, db_path: str = "pulse_stream.duckdb", seed: int = 0xf061ebbc2ca74d78):
self.db = duckdb.connect(db_path)
self.seed = seed
async def subscribe_actor(self, actor: str) -> AsyncIterator[PulseEvent]:
"""Subscribe to real-time updates for a user."""
# Poll DuckDB for new events
last_id = ""
while True:
result = self.db.execute("""
SELECT * FROM pulse_events
WHERE actor_handle = ? AND event_id > ?
ORDER BY created_at
LIMIT 10
""", [actor, last_id]).fetchall()
for row in result:
last_id = row[0]
yield PulseEvent(*row[:5])
await asyncio.sleep(1)
async def detect_trends(self, center_user: str, window_minutes: int = 60):
"""Detect trending topics in user's network."""
return self.db.execute("""
WITH word_counts AS (
SELECT
UNNEST(STRING_SPLIT(LOWER(text), ' ')) as word,
COUNT(*) as mentions
FROM pulse_events
WHERE created_at > NOW() - INTERVAL ? MINUTE
GROUP BY word
)
SELECT word, mentions
FROM word_counts
WHERE LENGTH(word) > 3
ORDER BY mentions DESC
LIMIT 10
""", [window_minutes]).fetchall()
Ruby Integration
# lib/pulse_stream.rb
require 'duckdb'
module PulseStream
def self.connect(db_path: "pulse_stream.duckdb")
@db = DuckDB::Database.open(db_path)
@conn = @db.connect
end
def self.latest_events(actor:, limit: 10)
@conn.query(<<~SQL, actor, limit)
SELECT event_id, event_type, text, created_at
FROM pulse_events
WHERE actor_handle = ?
ORDER BY created_at DESC
LIMIT ?
SQL
end
def self.velocity(post_uri:)
result = @conn.query(<<~SQL, post_uri)
SELECT velocity_per_min FROM v_post_velocity WHERE post_uri = ?
SQL
result.first&.first || 0.0
end
def self.viral?(post_uri:, threshold: 5.0)
velocity(post_uri: post_uri) > threshold
end
end
GF(3) Triad Integration
| Trit | Skill | Role |
|---|---|---|
| -1 | influence-propagation | Validates network patterns |
| 0 | bisimulation-game | Coordinates equivalence |
| +1 | pulse-mcp-stream | Generates live data |
Conservation: (-1) + (0) + (+1) = 0 ✓
MCP Configuration
{
"mcpServers": {
"pulse": {
"command": "node",
"args": ["pulse-mcp-server/dist/index.js"],
"env": {
"DUCKDB_PATH": "pulse_stream.duckdb",
"GAY_SEED": "0xf061ebbc2ca74d78"
}
}
}
}
Justfile Recipes
# Start pulse stream
pulse-start actor="barton.bsky.social":
python3 -c "import asyncio; from pulse_client import PulseClient; asyncio.run(PulseClient().subscribe_actor('{{actor}}'))"
# Check velocity
pulse-velocity uri:
ruby -I lib -r pulse_stream -e "PulseStream.connect; puts PulseStream.velocity(post_uri: '{{uri}}')"
# Detect trends
pulse-trends window="60":
duckdb pulse_stream.duckdb -c "SELECT * FROM v_post_velocity WHERE velocity_per_min > 1.0 LIMIT 10"
Related Skills
atproto-ingest(Layer 1) - Batch data collectioninfluence-propagation(Layer 7) - Network analysiscognitive-surrogate(Layer 6) - Pattern consumptionduckdb-temporal-versioning- Time-travel queries