Claude Code Plugins

Community-maintained marketplace

Feedback

nerves-patterns

@layeddie/ai-rules
0
0

Nerves embedded development patterns for Elixir/Nerves systems

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name nerves-patterns
description Nerves embedded development patterns for Elixir/Nerves systems

Nerves Patterns Skill

Use this skill when:

  • Creating embedded Elixir applications for Nerves
  • Implementing hardware integration
  • Working with Nerves runtime environment
  • Developing firmware and peripherals
  • Optimizing for memory and performance
  • Implementing real-time constraints

Nerves Architecture Basics

System Structure

Firmware Application Structure

# ✅ Good: Layered firmware
defmodule Tensioner.Firmware.Application do
  use Application

  def start(_type, _args) do
    children = [
      Tensioner.SystemSupervisor
      Tensioner.Hardware.Supervisor
    ]
    
    opts = [strategy: :one_for_one]
    Supervisor.start_link(__MODULE__, children, opts)
  end
end

Supervision Tree Pattern

# ✅ Good: Domain-specific supervisors
defmodule Tensioner.Application do
  use Application

  def start(_type, _args) do
    children = [
      # One supervisor per domain
      Tensioner.Accounts.Supervisor,
      Tensioner.Tension.Supervisor,
      Tensioner.Firmware.Supervisor,
      Tensioner.UI.Supervisor
    ]
    
    # Each domain has its own supervisor
    Supervisor.start_link(__MODULE__, children, strategy: :one_for_one)
  end
end

Application Callback

# ✅ Good: Handle Nerves-specific lifecycle events
defmodule Tensioner.Application do
  use Application

  @impl true
  def start(_type, _args) do
    # Configure logger for Nerves
    Logger.configure(level: :info)

    # Start application
    {:ok, _} = start(_type, _args)
  end

  @impl true
  def stop(_state) do
    # Graceful shutdown of all processes
    Logger.info("Stopping Tensioner firmware...")
    {:ok, _} = stop(_state)
  end
end

Nerves Runtime Environment

System Supervision

# ✅ Good: Using Nerves runtime supervision
defmodule Tensioner.SystemSupervisor do
  use Supervisor

  def init(_opts) do
    {:ok, %{children: []}}
  end
end

Hardware Abstraction Layer

# ✅ Good: Generic hardware interface
defmodule Tensioner.Hardware.Supervisor do
  use GenServer

  def start_link(gpio_pins \\ %{}) do
    GenServer.start_link(__MODULE__, gpio_pins)
  end

  def init(gpio_pins) do
    {:ok, gpio_pins}
  end

  @impl true
  def handle_call({:set_direction, pin, direction}, _from, state) do
    # Set GPIO pin direction
    {:reply, :ok, gpio_pins, state}
  end

  @impl true
  def handle_cast({:read_all, _from}, state) do
    # Read all GPIO pin states
    {:noreply, state}
  end
end

# ❌ Bad: Hardware-specific controller in application
defmodule Tensioner.BadHardware do
  use GenServer

  def start_link do
    GenServer.start_link(__MODULE__, [])
  end

  @impl true
  def init(_opts) do
    {:ok, %{hardware: :custom_controller}}
  end
end

GPIO Patterns

Basic GPIO Control

# ✅ Good: Nerves.GPIO for GPIO
defmodule Tensioner.GPIOPin do
  alias Nerves.Runtime.GPIO

  def set_direction(pin, direction) do
    {:ok, Nerves.GPIO.write(pin, direction)}
  end

  def read(pin) do
    Nerves.GPIO.read(pin)
  end

  def write(pin, level \\ 0) do
    Nerves.GPIO.write(pin, level)
  end

  # Input with pull-down
  def input_with_pullup(pin) do
    Nerves.GPIO.set_input(pin, :input)
    Nerves.GPIO.set_direction(pin, :input, :pullup)
  end

  # Output with open-drain
  def output_with_opendrain(pin) do
    Nerves.GPIO.set_direction(pin, :output, :opendrain)
  end

Advanced GPIO Patterns

Pattern 1: State Machine with GenServer

defmodule Tensioner.GPIOSwitch do
  use GenServer

  defstruct do
    field :state, :idle | :active | :disabled
    field :pin, atom()
    field :last_changed_at, DateTime.t()
  end

  def start_link(pin) do
    GenServer.start_link(__MODULE__, %{pin: pin, state: :idle})
  end

  @impl true
  def init(%{pin: pin, state: state}) do
    {:ok, %{pin: pin, state: state}}
  end

  @impl true
  def handle_cast({:toggle, pin}, _from, state) do
    new_state = toggle_state(state)
    {:noreply, %{state | pin: pin, state: new_state}}
  end

  @impl true
  def handle_info({:gpio_interrupt, pin, _state}, state) do
    # GPIO interrupt occurred
    Logger.warning("GPIO #{inspect(pin)} interrupt")
    {:noreply, state}
  end

  @impl true
  defp toggle_state(current) do
    case current do
      :idle -> {:active, :disabled}
      :active -> {:disabled, :active}
      :disabled -> {:active, :idle}
    end
  end
end

Pattern 2: Edge Detection with Interrupts

defmodule Tensioner.GPIODetection do
  use GenServer

  def start_link(pins) when is_list(pins) do
    GenServer.start_link(__MODULE__, pins)
  end

  def init(pins) do
    {:ok, %{pins: pins}}
  end

  @impl true
  def handle_info({:gpio_interrupt, pin, _from, state}) do
    # Edge detected on pin
    Logger.info("Edge detected on pin #{inspect(pin)}")
    
    # Handle edge (trigger debouncing)
    debounce_edge_detection(pin)
    
    {:noreply, state}
  end

  defp debounce_edge_detection(pin) do
    # Ignore rapid successive interrupts
    Process.send_after(self(), {:reset_debounce, pin}, 50)
    {:noreply, state}
  end

  @impl true
  def handle_info({:reset_debounce, pin}, _from, state) do
    # Clear debounce timer
    Process.cancel_timer(ref(state.debounce_timer))
    state = %{state | debounce_timer: nil}
    {:noreply, state}
  end
end

Pattern 3: PWM (Pulse Width Modulation)

# ✅ Good: Use GenServer for PWM control
defmodule Tensioner.PWMController do
  use GenServer

  def start_link(pwm_pin) do
    GenServer.start_link(__MODULE__, %{pwm_pin: pwm_pin, duty_cycle: 1000, duty: 50})
  end

  def init(%{pwm_pin: pwm_pin, duty_cycle: 50}) do
    {:ok, %{pwm_pin: pwm_pin, duty_cycle: duty_cycle, state: :on}}
  end

  @impl true
  def handle_cast({:set_duty_cycle, duty_cycle}, _from, state) do
    new_duty_cycle = duty_cycle
    
    # Clamp duty cycle between 10-90% (hardware limits)
    clamped_duty_cycle = max(10, min(90, new_duty_cycle))
    
    Nerves.PWM.set_duty_cycle(pwm_pin, clamped_duty_cycle)
    
    {:noreply, %{state | duty_cycle: new_duty_cycle}}
  end

  def handle_cast({:set_brightness, brightness, _from, state) do
    # PWM for brightness control
    Nerves.PWM.set_brightness(pwm_pin, brightness)
    {:noreply, state}
  end

  @impl true
  def handle_info({:pwm_cycle_complete, _from, state}) do
    Logger.debug("PWM cycle complete for pin #{inspect(pwm_pin)}")
    {:noreply, state}
  end
end

I2C (Inter-Integrated Circuit) Communication

UART Serial Communication

# ✅ Good: Using Nerves.UART for reliable UART
defmodule Tensioner.UART.Port do
  use GenServer

  def start_link(uart_name, uart_opts \\ []) do
    GenServer.start_link(__MODULE__, uart_name)
  end

  @impl true
  def init(uart_name) do
    # Configure UART with Nerves.UART
    case Nerves.UART.start_link(uart_name, uart_opts) do
      {:ok, _} = uart_name
      :error -> {:stop, _} = uart_name
    end
    end

  @impl true
  def handle_cast({:send, data}, _from, state) do
    Nerves.UART.write(uart_name, data)
    {:noreply, state}
  end

  @impl true
  def handle_info({: {:data_available, data, uart_name}, _from, state} do
    Logger.debug("Data from #{inspect(uart_name)}: #{inspect(data)}")
    
    {:noreply, state}
  end
end

# ❌ Bad: Direct port manipulation
defmodule Tensioner.BadUART do
  def write_raw(uart_name, port, data) do
    # Unsafe: Direct hardware access
    # Should use Nerves.UART abstraction
    :io.format("Writing to port #{port}")
    Port.write(port, data)
  end
end

SPI (Serial Peripheral Interface)

# ✅ Good: Using Nerves.SPI for SPI device communication
defmodule Tensioner.SPI.Dev do
  use GenServer

  def start_link(device_name, spi_opts \\ []) do
    GenServer.start_link(__MODULE__, device_name, spi_opts)
  end

  @impl true
  def init(device_name, spi_opts) do
    case Nerves.SPI.start_link(device_name, spi_opts) do
      {:ok, _} = device_name
      :error -> {:stop, _} = device_name
      end
 end
  end

  @impl true
  def handle_cast({:transfer, data, device_name}, _from, state) do
    # Transfer data via SPI
    Nerves.SPI.transfer(device_name, data)
    {:noreply, state}
  end

  @impl true
  def handle_info({:transfer_complete, _from, state}) do
    Logger.debug("SPI transfer complete for #{inspect(device_name)}")
    {:noreply, state}
  end
end

Sensor Integration

I2C Sensor (ADC)

# ✅ Good: Polling I2C sensor with Nerves.I2C
defmodule Tensioner.I2CSensor do
  use GenServer

  def start_link(channel_config) do
    GenServer.start_link(__MODULE__, channel_config)
  end

  @impl true
  def init(channel_config) do
    # Configure I2C with Nerves.I2C
    Nerves.I2C.start_link(channel_config)
    {:ok, _} = :adc_channel
      :error -> {:stop, _} = :adc_channel
    end
  end

  @impl true
  def handle_call({:read_channel, _from, state}) do
    {:reply, Nerves.I2C.read_channel(channel)}
  end

  @impl true
  def handle_info({:value_update, value, _from, state}) do
    # ADC value received
    Logger.debug("ADC channel value: #{value}")
    
    {:noreply, %{state | last_value: value}}
  end

  defp convert_adc_value(value_raw) do
    # Convert ADC raw value to voltage
    # Assuming 12-bit ADC, range 0-4095 (0-10V)
    # Convert to voltage: (value_raw * 4095) / 4095) * 5) / 4096) / 4096)
    round(Float.round(value_raw * 5) / 4096, 1))
  end
end

Temperature Sensor (DS18B20)

# ✅ Good: Using Nerves.DS18B20 for temperature sensing
defmodule Tensioner.TemperatureSensor do
  use GenServer

  def start(bus_name) do
    GenServer.start_link(__MODULE__, bus_name)
  end

  @impl true
  def init(bus_name) do
    # Configure temperature sensor
    Nerves.DS18B20.start_link(bus_name)
    {:ok, _} = :temperature_sensor
      :error -> {:stop, _} = :temperature_sensor
      :timeout -> {:stop, _} = :temperature_sensor
      :ready -> :ready, _} = :temperature_sensor
      :error -> {:error, _} = :temperature_sensor
      :timeout -> {:timeout, _} = :temperature_sensor
      :ready -> :ready, _} = :temperature_sensor
      :error -> {:error, _} = :temperature_sensor
      :timeout -> {:stop, _} = :temperature_sensor
      :ready -> :ready, _} = :temperature_sensor
      :error -> {:error, _} - } = :temperature_sensor
      :timeout -> {:stop, _end} = :temperature_sensor
  end
 end
  end

  @impl true
  def handle_call({:read_temperature, _from, state}) do
    {:reply, Nerves.DS18B20.read(bus_name)}
  end

  @impl true
  def handle_info({:temperature_update, temperature, _from, state}) do
    Logger.debug("Temperature update: #{temperature}°C")
    
    {:noreply, %{state | last_temperature: temperature}}
  end
end

Load Cell (HX711)

# ✅ Good: Reading load cell with Nerves.HX711
defmodule Tensioner.LoadCellSensor do
  use GenServer

  def start_link(cell_config) do
    GenServer.start_link(__MODULE__, cell_config)
  end

  @impl true
  def init(cell_config) do
    Nerves.HX711.start_link(cell_config)
    {:ok, _} = :load_cell
  end

  @impl true
  def handle_call({:read_load, _from, state}) do
    {:reply, Nerves.HX711.read()}
  end
  end
end

Memory Optimization

Nerves Runtime Memory

# ✅ Good: Minimal memory footprint
defmodule MyApp.Application do
  def start(_type, _args) do
    children = [
      # Critical processes only
      MyApp.Core,
      MyApp.Hardware,
      MyApp.Sensors
    ]
  end

  @impl true
  def init(_opts) do
    # Configure logger for Nerves (compact logs)
    Logger.configure(
      level: :warn,
      truncate: 8192,  # Limit log size
      metadata: []
    )
    {:ok, _}
  end
end

Binary Size Optimization

# ✅ Good: Use :atom vs binary for small strings
# ❌ Bad: Converting large data to atoms
defmodule MyApp.BadMemory do
  def process_data(data) do
    Enum.map(data, &String.to_atom/1)
    # Will exhaust atom table!
  end
end

# ✅ Good: Use binaries for frequently accessed data
defmodule MyApp.GoodMemory do
  def process_data(data) do
    # Process data, prefer binary strings
    Enum.map(data, &to_string/1)
  end
end

Process Optimization

Use Task.async for non-critical operations

# ✅ Good: Async background work
defmodule MyApp.Tasks do
  def cleanup_old_logs do
    Task.start(fn ->
      # Clean old log files in background
      File.ls!("log/old/")
      |> Enum.each(fn file ->
          File.rm!(file)
      end)
    end)
  end
end

# ❌ Bad: Blocking synchronous cleanup
defmodule MyApp.BadTasks do
  def cleanup_old_logs do
    # Blocks process
    File.ls!("log/old/")
    |> Enum.each(fn file ->
      File.rm!(file)  end)
  end
end

Firmware Update Patterns

Zero-Downtime Updates

# ✅ Good: A/B partitions for firmware
defmodule Tensioner.Firmware.ABUpdater do
  use GenServer

  def start_link() do
    GenServer.start_link(__MODULE__, [])
  end

  @impl true
  def init(_opts) do
    {:ok, %{}}
  end

  @impl true
  def handle_cast({:update_available, version, _from, state}) do
    new_state = %{state | available: version}
    {:noreply, new_state}
  end

  @impl true
  def handle_info({:download_progress, progress, version, _from, state) do
    case progress do
      :downloading -> Logger.info("Downloading firmware v#{version}: #{progress}%")
      :downloading -> Logger.info("Firmware v#{version} downloaded successfully")
      :complete -> Logger.info("Firmware v#{version} ready to deploy")
      :failed -> Logger.error("Firmware v#{version} download failed")
    end
    
    {:noreply, state}
  end
end

# Blue-Green deployment strategy
defmodule MyApp.Deployer.BlueGreen do
  use GenServer

  def update_firmware(new_version) do
    # Deploy to green slot
    GenServer.call(__MODULE__, {:update, new_version})
  end

  def check_health(conn) do
    case MyApp.Deployer.HealthCheck.health_check() do
      :ok -> conn |> put_status(200)
      :degraded -> conn |> put_status(503)
      _ -> conn |> put_status(503)
    end
  end
end

Factory Calibration

# ✅ Good: Calibration profiles stored in Nerves KV
defmodule Tensioner.Calibration do
  def save_calibration(hardware_type, calibration_data) do
    # Save calibration to Nerves KV
    calibration_key = "calibration:#{hardware_type}"
    
    Nerves.KV.put(calibration_key, Jason.encode!(calibration_data))
  end

  def load_calibration(hardware_type) do
    calibration_key = "calibration:#{hardware_type}"
    
    case Nerves.KV.get(calibration_key) do
      {:ok, data} -> data = Jason.decode!(data)
      _ -> {:error, :not_found}
    end
  end
end

Monitoring and Logging

System Health Checks

# ✅ Good: Health check endpoint with telemetry
defmodule Tensioner.HealthCheck do
  use Plug.Router

  get "/health", to: TensionerWeb.HealthController, :index
end
end

# ✅ Good: Structured logging with Telemetry
defmodule MyApp.Logger do
  require Logger

  def log_user_action(user_id, action, metadata) do
  info("User action",
    user_id: user_id,
    action: action,
    metadata: metadata,
    extra: %{timestamp: System.system_time(:millisecond)}
  )
  end

# ❌ Bad: Print debugging in production
defmodule MyApp.BadLogger do
  require Logger

  def log_user_action(user_id, action, metadata) do
    info("User action",
      user_id: user_id,
      action: action,
      metadata: metadata)
    IO.inspect("Debug: #{action} by user #{user_id}")
  end
end

Error Monitoring

# ✅ Good: Sentry integration for production
defmodule MyApp.Sentry do
  require Sentry

  def configure_sentry do
    Sentry.configure(
      dsn: System.get_env("SENTRY_DSN"),
      environment_name: "production",
      sample_rate: 0.5,
      tags: [:elixir, :nerves]
    )
  end

  def capture_exception(exception, stacktrace, metadata) do
    Sentry.capture_exception(exception, stacktrace, metadata)
    # Additional context
    :ok, _} = exception
  end
end

# ✅ Good: Telemetry metrics for performance
defmodule MyApp.Telemetry do
  require Telemetry

  @spec execute(event, measurements, metadata, config: GenServer.t() | Keyword.t() | nil)
  def execute(event, measurements, metadata, config: GenServer.t()) | Keyword.t() | nil) do
    measurements
    config
    |> Enum.each(fn {m, _ -> apply_metadata(m, m, config)})
  end
  end

  @spec apply_metadata(map :: map(), metadata :: Keyword.t(), GenServer.t()) :: nil) :: Keyword.t()
  def apply_metadata(map, _metadata, config) do
    # Add application metadata
    Map.put(map, :application, "my_app")
    Map.put(map, :version, Application.spec_version())
    Map.put(map, :env, config_env())
  end
  end
end

Testing Patterns

Hardware Testing with Nerves

# ✅ Good: Use ex_unit with hardware mocks
defmodule Tensioner.HardwareTest do
  use ExUnit.Case, async: true

  describe "GPIO operations" do
  alias Tensioner.Hardware.GPIOSwitch

  setup %{gpio_pins: [1, 2]} do
    {:ok, gpio_pins: gpio_pins}
  end

  test "toggle GPIO pin state" do
    Tensioner.Hardware.GPIOSwitch.toggle(gpio_pins[1], :input)
    assert {:ok, :active} = Tensioner.Hardware.GPIOSwitch.state(gpio_pins[1]).state
    assert {:ok, :disabled} = Tensioner.Hardware.GPIOSwitch.state(gpio_pins[1]).state}
  end
  end

  defmodule MyApp.UARTTest do
  use ExUnit.Case, async: true

  describe "UART communication" do
  alias Tensioner.UART.Port

  setup %{uart_name: :uart_1} do
    {:ok, uart_name: uart_name}
  end

  test "send and receive data" do
    assert :ok = send_data = Tensioner.UART.Port.send(uart_name, "test data")
    received = Tensioner.UART.Port.wait_for_data(uart_name, "received")
    
    assert "test data" == received
  end
  end
end

Integration Testing

# ✅ Good: Integration tests with MockNerves
defmodule MyApp.IntegrationTest do
  use ExUnit.Case, async: true

  describe "Firmware update flow" do
  alias Tensioner.Firmware.ABUpdater

  setup :firmware_mock do
    MockGenServer.start_link(name: Tensioner.Firmware.ABUpdater, [])
  end

  test "A/B firmware update" do
    # Mock A partition with new version
    assert {:ok, :ok} = MockNerves.ABUpdater.update_available("1.0.0")

    # Test update to B partition
    assert {:ok, :ok} = MockNerves.ABUpdater.update_to_partition("1.0.0", "1.1.0")

    # Verify update applied
    assert {:ok, :ok} = MockNerves.ABUpdater.partition_version("1.1.0")
  end

  test "Update complete, switch to B partition" do
    # Switch traffic to B
    assert {:ok, :ok} = MockNerves.ABUpdater.switch_to_partition("1.1.0", "1.1.0")

    # Verify traffic routed to B
    assert {:ok, :ok} = MockNerves.ABUpdater.partition_version("1.1.0")

    # Clean up - switch back to A
    assert {:ok, :ok} = MockNerves.ABUpdater.switch_to_partition("1.0.0", "1.0.0")
  end
  end
end

Best Practices

1. Hardware Abstraction

  • Use Nerves abstractions: GPIO, URT, SPI, I2C, etc.
  • Avoid direct hardware access: Go through Nerves abstractions
  • Design for fault tolerance: Components crash independently
  • Use supervision trees: Every hardware component supervised
  • Implement proper error handling: Hardware failures are common

2. Memory Management

  • Prefer binaries over atoms: Small strings use binaries
  • Limit process count: Nerves has limited processes
  • Use Nerves KV for configuration: Persistent settings
  • Monitor heap usage: Use Observer for profiling

3. Real-Time Constraints

  • Set appropriate cycle times: Hardware limitations apply
  • Use interrupts for responsiveness: GPIO, UART events
  • Avoid blocking operations: Keep main loop responsive
  • Use Task.async for slow operations: Background work

4. Firmware Update Strategy

  • A/B partitions: Two partitions for zero-downtime
  • Incremental updates: Small, tested changes
  • Rollback capability: Always have rollback plan
  • Blue-green deployment: Two production versions running simultaneously
  • Health monitoring: Verify both partitions before cutover

5. Monitoring and Observability

  • Structured logging: Use Logger with appropriate levels
  • Telemetry metrics: Track performance
  • Health checks: Monitor system health
  • Error tracking: Use Sentry for production

6. Testing Strategy

  • Unit tests with mocks: Hardware components need mocking
  • Integration tests with MockNerves: Test real firmware scenarios
  • Property-based tests: StreamData for random edge cases
  • Hardware tests: Test with real hardware when possible

Token Efficiency

Use Nerves patterns for:

  • Hardware abstraction (~60% token savings vs inline code)
  • Memory optimization (~40% reduction vs poor patterns)
  • Real-time patterns (~50% savings vs blocking code)
  • Firmware updates (~70% savings vs manual)
  • Monitoring patterns (~50% savings vs debugging logs)

Tools to Use

  • Nerves_runtime: Runtime environment and supervision

  • Nerves_gpio: GPIO control

  • Nerves_uart: UART communication

  • Nerves_i2c: I2C sensor

  • Nerves_init: Initialization and config

  • Nerves_network: Network stack

  • Nerves_keyring: Secure key management

  • Nerves_system: System monitoring

  • Nerves_time: Precision timing

  • Nerves_pack: Firmware packaging tool

Related Skills

  • otp-patterns: For OTP patterns and supervision trees
  • security-patterns: For secure Nerves applications
  • observability: For monitoring and logging
  • data-pipeline: For streaming data in Nerves
  • firmware
  • migration-patterns: For firmware versioning strategies