Claude Code Plugins

Community-maintained marketplace

Feedback

pulse-mcp-stream

@plurigrid/asi
0
0

Layer 1 Real-Time Social Stream Monitoring via MCP with DuckDB persistence

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name pulse-mcp-stream
description Layer 1 Real-Time Social Stream Monitoring via MCP with DuckDB persistence

pulse-mcp-stream

Layer 1: Real-Time Social Stream Monitoring via MCP

Version: 1.1.0 (music-topos enhanced) Trit: +1 (Generator - produces live data) Bundle: acquisition

Overview

Pulse-MCP-stream provides real-time monitoring of social interactions, enabling the cognitive surrogate system to stay updated with the latest patterns. It streams mentions, engagement changes, and trending topics.

Enhanced Integration: MCP + DuckDB

MCP Server (TypeScript)

// pulse-mcp-server/src/index.ts
import { Server } from "@modelcontextprotocol/sdk/server";
import { Firehose } from "@atproto/sync";
import * as duckdb from "duckdb";

const server = new Server({
  name: "pulse-mcp-stream",
  version: "1.0.0"
});

// Connect to DuckDB for persistence
const db = new duckdb.Database("pulse_stream.duckdb");

server.setRequestHandler("subscribe", async (params) => {
  const { actor, filters } = params;
  
  const firehose = new Firehose({
    service: "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos"
  });
  
  firehose.on("create", async (event) => {
    if (event.author === actor) {
      // Store in DuckDB
      await db.run(`
        INSERT INTO pulse_events (event_id, event_type, actor_did, text, created_at)
        VALUES (?, ?, ?, ?, CURRENT_TIMESTAMP)
      `, [event.uri, event.type, event.author, event.record?.text]);
    }
  });
  
  await firehose.start();
  return { status: "subscribed", actor };
});

DuckDB Schema

CREATE TABLE pulse_events (
    event_id VARCHAR PRIMARY KEY,
    event_type VARCHAR,  -- 'post', 'reply', 'like', 'repost', 'mention'
    actor_did VARCHAR,
    actor_handle VARCHAR,
    subject_uri VARCHAR,
    text TEXT,
    created_at TIMESTAMP,
    ingested_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    gay_color VARCHAR  -- Deterministic color via SPI seed
);

CREATE TABLE engagement_deltas (
    delta_id VARCHAR PRIMARY KEY,
    post_uri VARCHAR,
    likes_delta INT,
    reposts_delta INT,
    replies_delta INT,
    velocity FLOAT,  -- engagements per minute
    measured_at TIMESTAMP
);

-- Real-time velocity tracking
CREATE VIEW v_post_velocity AS
SELECT 
    post_uri,
    COUNT(*) FILTER (WHERE event_type = 'like') as likes,
    COUNT(*) FILTER (WHERE event_type = 'repost') as reposts,
    COUNT(*) / (EXTRACT(EPOCH FROM (MAX(created_at) - MIN(created_at))) / 60.0) as velocity_per_min
FROM pulse_events
WHERE created_at > NOW() - INTERVAL '1 hour'
GROUP BY post_uri;

Python Client

# pulse_client.py
import asyncio
import duckdb
from dataclasses import dataclass
from typing import AsyncIterator

@dataclass
class PulseEvent:
    event_id: str
    event_type: str
    actor: str
    text: str
    created_at: str

class PulseClient:
    def __init__(self, db_path: str = "pulse_stream.duckdb", seed: int = 0xf061ebbc2ca74d78):
        self.db = duckdb.connect(db_path)
        self.seed = seed
    
    async def subscribe_actor(self, actor: str) -> AsyncIterator[PulseEvent]:
        """Subscribe to real-time updates for a user."""
        # Poll DuckDB for new events
        last_id = ""
        while True:
            result = self.db.execute("""
                SELECT * FROM pulse_events 
                WHERE actor_handle = ? AND event_id > ?
                ORDER BY created_at
                LIMIT 10
            """, [actor, last_id]).fetchall()
            
            for row in result:
                last_id = row[0]
                yield PulseEvent(*row[:5])
            
            await asyncio.sleep(1)
    
    async def detect_trends(self, center_user: str, window_minutes: int = 60):
        """Detect trending topics in user's network."""
        return self.db.execute("""
            WITH word_counts AS (
                SELECT 
                    UNNEST(STRING_SPLIT(LOWER(text), ' ')) as word,
                    COUNT(*) as mentions
                FROM pulse_events
                WHERE created_at > NOW() - INTERVAL ? MINUTE
                GROUP BY word
            )
            SELECT word, mentions
            FROM word_counts
            WHERE LENGTH(word) > 3
            ORDER BY mentions DESC
            LIMIT 10
        """, [window_minutes]).fetchall()

Ruby Integration

# lib/pulse_stream.rb
require 'duckdb'

module PulseStream
  def self.connect(db_path: "pulse_stream.duckdb")
    @db = DuckDB::Database.open(db_path)
    @conn = @db.connect
  end
  
  def self.latest_events(actor:, limit: 10)
    @conn.query(<<~SQL, actor, limit)
      SELECT event_id, event_type, text, created_at
      FROM pulse_events
      WHERE actor_handle = ?
      ORDER BY created_at DESC
      LIMIT ?
    SQL
  end
  
  def self.velocity(post_uri:)
    result = @conn.query(<<~SQL, post_uri)
      SELECT velocity_per_min FROM v_post_velocity WHERE post_uri = ?
    SQL
    result.first&.first || 0.0
  end
  
  def self.viral?(post_uri:, threshold: 5.0)
    velocity(post_uri: post_uri) > threshold
  end
end

GF(3) Triad Integration

Trit Skill Role
-1 influence-propagation Validates network patterns
0 bisimulation-game Coordinates equivalence
+1 pulse-mcp-stream Generates live data

Conservation: (-1) + (0) + (+1) = 0 ✓

MCP Configuration

{
  "mcpServers": {
    "pulse": {
      "command": "node",
      "args": ["pulse-mcp-server/dist/index.js"],
      "env": {
        "DUCKDB_PATH": "pulse_stream.duckdb",
        "GAY_SEED": "0xf061ebbc2ca74d78"
      }
    }
  }
}

Justfile Recipes

# Start pulse stream
pulse-start actor="barton.bsky.social":
    python3 -c "import asyncio; from pulse_client import PulseClient; asyncio.run(PulseClient().subscribe_actor('{{actor}}'))"

# Check velocity
pulse-velocity uri:
    ruby -I lib -r pulse_stream -e "PulseStream.connect; puts PulseStream.velocity(post_uri: '{{uri}}')"

# Detect trends
pulse-trends window="60":
    duckdb pulse_stream.duckdb -c "SELECT * FROM v_post_velocity WHERE velocity_per_min > 1.0 LIMIT 10"

Related Skills

  • atproto-ingest (Layer 1) - Batch data collection
  • influence-propagation (Layer 7) - Network analysis
  • cognitive-surrogate (Layer 6) - Pattern consumption
  • duckdb-temporal-versioning - Time-travel queries