Claude Code Plugins

Community-maintained marketplace

Feedback

Use when fetching oil/gas trade flow data from Kpler. Covers authentication, trade queries, flow aggregations, entity search, vessel positions, and company fleet data.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name kpler
description Use when fetching oil/gas trade flow data from Kpler. Covers authentication, trade queries, flow aggregations, entity search, vessel positions, and company fleet data.

Kpler Trade Data API

Access oil and gas trade flows, vessel tracking, and company data from the Kpler terminal.

Prerequisites

  1. Kpler account with terminal access
  2. Store credentials in .env:
KPLER_USERNAME=user@example.com
KPLER_PASSWORD=your_password

Setup

Copy the client module to your project:

cp .claude/skills/kpler/scripts/kpler_client.py scripts/

Install dependencies:

uv add httpx pyjwt python-dotenv

Quick start

import asyncio
from kpler_client import KplerClient

async def main():
    async with KplerClient() as client:
        # Search for a player
        results = await client.search("shell", categories=["PLAYER"])
        shell_id = results["players"][0]["entity"]["id"]

        # Query their trades
        trades = await client.query_trades(players=[shell_id], size=20)
        print(f"Found {len(trades['result']['trades'])} trades")

asyncio.run(main())

Authentication

The client handles Auth0 OAuth automatically:

  • Auto-login from .env credentials
  • Token storage in .kpler_token, .kpler_refresh_token
  • Automatic refresh 5 minutes before expiry
  • Retry on 401 with fresh token

Manual auth control:

client = KplerClient()
await client.login("user@example.com", "password")  # Manual login
client.is_authenticated()  # Check status
await client.logout()  # Clear tokens

API methods

Search entities

Find players, vessels, installations, zones, and products by name:

results = await client.search(
    text="gazprom",
    categories=["PLAYER", "INSTALLATION"],  # Optional filter
    commodity_types=["lng"],  # Optional: lng, oil, lpg, dry
)

# Results grouped by type
for player in results.get("players", []):
    print(f"{player['entity']['name']} (ID: {player['entity']['id']})")

Categories: PLAYER, VESSEL, INSTALLATION, ZONE, PRODUCT

Query trades

Get individual trade records:

trades = await client.query_trades(
    # Pagination
    from_=0,
    size=100,

    # Filters (use IDs from search - strings work, converted to ints)
    locations=[1234],  # Zone/installation IDs
    products=[5678],   # Product IDs
    players=[3836],    # Company IDs
    vessels=[9012],    # Vessel IDs

    # Trade status
    statuses=["completed"],  # ongoing, completed, cancelled
    trade_types=["import", "export"],

    # Options
    with_forecasted=False,
)

for trade in trades["data"]:
    origin = trade.get("portCallOrigin", {}).get("zone", {}).get("name", "Unknown")
    dest = trade.get("portCallDestination", {}).get("zone", {}).get("name", "Unknown")
    print(f"{origin} → {dest}")

Query flows (aggregated)

Get aggregated flow data with time series:

flows = await client.query_flows(
    # Required
    direction="export",  # export, import
    granularity="months",  # years, months, weeks, days

    # Date range
    start_date="2024-01-01",
    end_date="2024-12-31",

    # Filters
    locations=[1234],
    products=[5678],
    players=[3836],

    # Split results by dimension
    split_on="countries",  # countries, ports, products, vessels, buyers, sellers

    # Options
    cumulative=False,
    forecasted=False,
    intra=False,  # Include intra-region flows
)

# Response format: series by date with split values
for entry in flows["series"]:
    year = entry["date"]
    for dataset in entry.get("datasets", []):
        for split in dataset.get("splitValues", []):
            vol = split["values"]["volume"]
            print(f"{year}: {split['name']} = {vol/1e6:.1f} Mt")

Split options: countries, ports, products, vessels, buyers, sellers, charterers

Get vessel positions

Raw AIS tracking data:

positions = await client.get_vessel_positions(
    vessel_id=12345,
    start_date="2024-01-01T00:00:00Z",  # ISO 8601
    end_date="2024-01-31T23:59:59Z",
    limit=1000,
)

for pos in positions:
    print(f"{pos['timestamp']}: {pos['lat']}, {pos['lon']}")

Get player fleet

Company fleet information:

fleet = await client.get_player_fleet(player_id=3836)

print(f"Company: {fleet['name']}")
print(f"Vessels owned: {len(fleet.get('ownedVessels', []))}")
print(f"Subsidiaries: {len(fleet.get('subsidiaries', []))}")

Query contracts

Long-term agreements and tenders:

contracts = await client.query_contracts(
    types=["SPA", "LTA"],  # Tender, SPA, LTA, TUA
    players=[3836],
    from_=0,
    size=50,
)

ETL pattern for notebooks

Typical workflow for building a DuckDB database:

# scripts/fetch_kpler.py
import asyncio
import duckdb
from kpler_client import KplerClient
from dotenv import load_dotenv

load_dotenv()

async def fetch_data():
    async with KplerClient() as client:
        # Search for entities
        russia = await client.search("russia", categories=["ZONE"])
        russia_id = russia["zones"][0]["entity"]["id"]

        # Get export flows
        flows = await client.query_flows(
            direction="export",
            locations=[russia_id],
            granularity="months",
            start_date="2020-01-01",
            end_date="2024-12-31",
            split_on="countries",
        )

        return flows["result"]["series"]

def build_database(data):
    con = duckdb.connect("data/data.duckdb")
    con.execute("""
        CREATE OR REPLACE TABLE flows (
            date DATE,
            destination VARCHAR,
            volume_kt DOUBLE
        )
    """)

    for series in data:
        for point in series.get("data", []):
            con.execute(
                "INSERT INTO flows VALUES (?, ?, ?)",
                [point["date"], series["name"], point["value"]]
            )

    con.close()

if __name__ == "__main__":
    data = asyncio.run(fetch_data())
    build_database(data)

Add to Makefile:

data:
	uv run python scripts/fetch_kpler.py

Rate limits

The Kpler API has rate limits. Add delays for large queries:

import asyncio

for player_id in player_ids:
    data = await client.query_trades(players=[player_id])
    await asyncio.sleep(0.5)  # Rate limit protection

Common entity IDs

Search to find current IDs, as these may change:

Entity Example search Type
Russia russia ZONE
China china ZONE
Shell shell PLAYER
TotalEnergies total PLAYER
Crude oil crude PRODUCT
LNG lng PRODUCT

Troubleshooting

401 Unauthorized: Credentials invalid or expired. Check .env file.

Empty results: Verify entity IDs are correct. Use search() to find current IDs.

Token errors: Delete .kpler_token* files and re-authenticate.

Rate limited: Add delays between requests. The API may temporarily block rapid queries.

API reference

Base URL: https://terminal.kpler.com/api

Endpoint Method Purpose
/trades POST Query trade records
/flows POST Aggregate flow data
/contracts GET Contract data
/players/{id} GET Company fleet info
/vessels/{id}/positions GET AIS positions
/graphql/ POST Entity search