Claude Code Plugins

Community-maintained marketplace

Feedback

convert-python-erlang

@aRustyDev/ai
0
0

Convert Python code to idiomatic Erlang. Use when migrating Python projects to Erlang, translating Python patterns to idiomatic Erlang, or refactoring Python codebases for fault-tolerance, distributed computing, and concurrency. Extends meta-convert-dev with Python-to-Erlang specific patterns.

Install Skill

1Download skill
2Enable skills in Claude

Open claude.ai/settings/capabilities and find the "Skills" section

3Upload to Claude

Click "Upload skill" and select the downloaded ZIP file

Note: Please verify skill by going through its instructions before using it.

SKILL.md

name convert-python-erlang
description Convert Python code to idiomatic Erlang. Use when migrating Python projects to Erlang, translating Python patterns to idiomatic Erlang, or refactoring Python codebases for fault-tolerance, distributed computing, and concurrency. Extends meta-convert-dev with Python-to-Erlang specific patterns.

Convert Python to Erlang

Convert Python code to idiomatic Erlang. This skill extends meta-convert-dev with Python-to-Erlang specific type mappings, idiom translations, and tooling for transforming dynamic, garbage-collected Python code into functional, concurrent, fault-tolerant Erlang.

This Skill Extends

  • meta-convert-dev - Foundational conversion patterns (APTV workflow, testing strategies)

For general concepts like the Analyze → Plan → Transform → Validate workflow, testing strategies, and common pitfalls, see the meta-skill first.

This Skill Adds

  • Type mappings: Python types → Erlang types (dynamic → dynamic with pattern matching)
  • Idiom translations: Python patterns → idiomatic Erlang
  • Error handling: Exceptions → let it crash + supervision
  • Async patterns: asyncio → processes + message passing
  • Concurrency model: Threading/asyncio → lightweight processes + OTP
  • Class hierarchy: OOP → functional + behavior modules

This Skill Does NOT Cover

  • General conversion methodology - see meta-convert-dev
  • Python language fundamentals - see lang-python-dev
  • Erlang language fundamentals - see lang-erlang-dev
  • Reverse conversion (Erlang → Python) - see convert-erlang-python

Quick Reference

Python Erlang Notes
int integer() Arbitrary precision in both
float float() IEEE 754 double precision
bool true / false atoms Erlang booleans are atoms
str binary() or string() Binary for UTF-8, list of codepoints for string
bytes binary() Direct mapping
list[T] list(T) Singly-linked lists
tuple tuple() Immutable, fixed-size
dict[K, V] map() or dict module Maps for Erlang 17+, dict for older
set[T] sets module Use sets:new()
None undefined atom Or use tagged tuples like {ok, Value} / error
Union[T, U] Tagged tuples {type1, Value} / {type2, Value}
Callable fun() Anonymous or named functions
async def spawn() + process Async → concurrent process
@dataclass record or map Records for structured data
Exception throw/error/exit Let it crash philosophy

When Converting Code

  1. Analyze source thoroughly before writing target
  2. Map types first - create type equivalence table
  3. Embrace immutability - all data is immutable in Erlang
  4. Preserve semantics over syntax similarity
  5. Adopt Erlang idioms - don't write "Python code in Erlang syntax"
  6. Think in processes - replace threads/async with lightweight processes
  7. Let it crash - replace defensive exception handling with supervision
  8. Pattern match - use pattern matching instead of if/else chains
  9. Test equivalence - same inputs → same outputs

Type System Mapping

Primitive Types

Python Erlang Notes
int integer() Both have arbitrary precision
float float() IEEE 754 double precision
bool true / false Atoms, not a separate type
str binary() UTF-8 binary: <<"hello">>
str string() List of codepoints: "hello" = [104,101,108,108,111]
bytes binary() Raw binary data
bytearray binary() Immutable in Erlang
None undefined Atom, or use option pattern

Critical Note on Strings: Erlang has two string representations:

  1. Binaries (<<"text">>): More memory-efficient, preferred for UTF-8 text
  2. Lists ("text"): Lists of integers (codepoints), legacy representation

Use binaries for modern Erlang code.

Collection Types

Python Erlang Notes
list[T] list(T) Singly-linked, immutable
tuple tuple() Fixed-size, immutable, pattern matchable
dict[K, V] map() Modern maps (Erlang 17+): #{key => value}
dict[K, V] dict module Legacy dict module
set[T] sets module sets:new(), sets:add_element()
frozenset[T] sets module All Erlang collections are immutable
collections.deque queue module queue:new(), FIFO operations
collections.OrderedDict map() Maps maintain insertion order (Erlang 18+)
collections.defaultdict maps:get(Key, Map, Default) Use default parameter
collections.Counter map() Map with integer values

Composite Types

Python Erlang Notes
class (data) record Compile-time record definition
class (data) map() Runtime structured data
class (behavior) behaviour module gen_server, gen_statem, etc.
@dataclass -record(name, {fields}) Record with type specs
typing.Protocol behaviour Behavior contracts
typing.TypedDict map() with type spec -type my_map() :: #{field := type()}.
typing.NamedTuple record or tuple Record preferred for clarity
enum.Enum atoms Use atoms for enumerated values
typing.Literal["a", "b"] atoms a, b as atoms
typing.Union[T, U] Tagged tuples {ok, Value} / {error, Reason}
typing.Optional[T] `Value undefined`
typing.Callable[[Args], Ret] fun() fun((Args) -> Ret)

Type Annotations → Type Specs

Python Erlang Notes
def f(x: int) -> int -spec f(integer()) -> integer(). Function type specification
def f(x: T) -> T -spec f(T) -> T when T :: any(). Generic type variable
x: Any any() Top type
x: list[int] [integer()] List of integers
x: Optional[int] integer() | undefined Union type

Idiom Translation

Pattern 1: None Handling (Option Pattern)

Python:

# Optional chaining with walrus operator
if user := get_user(user_id):
    name = user.name
else:
    name = "Anonymous"

# Or simpler
name = user.name if user else "Anonymous"

Erlang:

% Tagged tuple pattern
case get_user(UserId) of
    {ok, User} ->
        Name = maps:get(name, User);
    error ->
        Name = <<"Anonymous">>
end.

% Or with pattern matching in function head
get_user_name(UserId) ->
    case get_user(UserId) of
        {ok, #{name := Name}} -> Name;
        error -> <<"Anonymous">>
    end.

Why this translation:

  • Python uses None/truthy checks; Erlang uses tagged tuples {ok, Value} / error
  • Erlang's pattern matching is more explicit and compile-time checked
  • Tagged tuples are the idiomatic Erlang way to represent optional values

Pattern 2: List Comprehensions

Python:

# List comprehension
squared_evens = [x * x for x in numbers if x % 2 == 0]

# Generator expression
total = sum(x * x for x in numbers if x % 2 == 0)

Erlang:

% List comprehension
SquaredEvens = [X * X || X <- Numbers, X rem 2 == 0].

% Fold for aggregation
Total = lists:foldl(
    fun(X, Acc) when X rem 2 == 0 -> Acc + X * X;
       (_, Acc) -> Acc
    end,
    0,
    Numbers
).

% Or filter + map + sum
Total = lists:sum(
    lists:map(
        fun(X) -> X * X end,
        lists:filter(fun(X) -> X rem 2 == 0 end, Numbers)
    )
).

Why this translation:

  • Erlang list comprehensions are syntactically similar to Python's
  • Use || instead of for, guards instead of if
  • For aggregation, lists:foldl/3 is the standard approach
  • List operations in Erlang are functional transformations

Pattern 3: Dictionary Operations

Python:

# Get with default
value = config.get("timeout", 30)

# Setdefault pattern
cache.setdefault(key, expensive_compute())

# Dictionary comprehension
squared = {k: v * v for k, v in items.items()}

Erlang:

% Get with default
Value = maps:get(timeout, Config, 30).

% Update only if not present (immutable, returns new map)
NewCache = case maps:is_key(Key, Cache) of
    true -> Cache;
    false -> maps:put(Key, expensive_compute(), Cache)
end.

% Map comprehension (Erlang 18+)
Squared = maps:from_list([{K, V * V} || {K, V} <- maps:to_list(Items)]).

% Or more idiomatically with maps:map/2
Squared = maps:map(fun(_K, V) -> V * V end, Items).

Why this translation:

  • Erlang maps are immutable; operations return new maps
  • maps:get/3 takes default as third parameter
  • maps:map/2 transforms values while preserving keys
  • No direct equivalent to setdefault because of immutability

Pattern 4: String Formatting

Python:

# f-strings (Python 3.6+)
message = f"User {user.name} has {count} items"

# format method
message = "User {} has {} items".format(user.name, count)

# % formatting (old style)
message = "User %s has %d items" % (user.name, count)

Erlang:

% io_lib:format/2 (returns iolist, needs flattening for string)
Message = io_lib:format("User ~s has ~p items", [Name, Count]),
FlatMessage = lists:flatten(Message).

% For binaries (more common in modern Erlang)
Message = iolist_to_binary(io_lib:format("User ~s has ~p items", [Name, Count])).

% String concatenation with binaries
Message = <<<<"User ">>/binary, Name/binary, <<" has ">>/binary,
            (integer_to_binary(Count))/binary, <<" items">>/binary>>.

Why this translation:

  • Erlang uses io_lib:format/2 with format specifiers: ~s (string), ~p (any term), ~w (term), ~.2f (float)
  • Returns an iolist, not a string - flatten or convert to binary
  • Binary concatenation is efficient but verbose; use io_lib:format/2 for readability

Pattern 5: Duck Typing → Behaviors

Python:

# Duck typing - if it has a .read() method, it's file-like
def process_data(file_like):
    data = file_like.read()
    return parse(data)

# Works with files, StringIO, BytesIO, etc.

Erlang:

% Behavior-based - explicit contract
-module(file_processor).

% Define behavior
-callback read(State :: any()) -> {ok, binary()} | {error, term()}.

% Use behavior
process_data(Module, State) ->
    case Module:read(State) of
        {ok, Data} -> parse(Data);
        {error, Reason} -> {error, Reason}
    end.

% Or use process-based abstraction
process_data(Pid) ->
    Pid ! {self(), read},
    receive
        {Pid, {ok, Data}} -> parse(Data);
        {Pid, {error, Reason}} -> {error, Reason}
    end.

Why this translation:

  • Erlang uses explicit behaviors (-behaviour(gen_server)) instead of duck typing
  • Behavior modules define callbacks that implementing modules must provide
  • Process-based abstraction is more common: send messages, receive responses
  • Compile-time checking of behavior implementations

Pattern 6: Context Managers → Process Lifecycle

Python:

# with statement for resource management
with open("data.txt") as f:
    data = f.read()
# File automatically closed

# Custom context manager
with lock_held(mutex):
    # Critical section
    pass
# Lock automatically released

Erlang:

% File I/O with automatic cleanup
process_file(Filename) ->
    {ok, File} = file:open(Filename, [read]),
    try
        {ok, Data} = file:read(File, 1024 * 1024),
        process_data(Data)
    after
        file:close(File)
    end.

% Process-based resource management
with_lock(LockPid, Fun) ->
    LockPid ! {self(), acquire},
    receive
        {LockPid, acquired} ->
            try
                Fun()
            after
                LockPid ! {self(), release}
            end
    end.

Why this translation:

  • Erlang uses try...after for cleanup guarantees (similar to Python's finally)
  • Process-based patterns are common: spawn a process to manage the resource
  • Processes can be supervised; if they crash, supervisor handles cleanup
  • No enter/exit protocol; use explicit try...after blocks

Pattern 7: Async/Await → Processes and Message Passing

Python:

# Async function
async def fetch_user(user_id: int) -> dict:
    await asyncio.sleep(0.1)  # Simulate I/O
    return {"id": user_id, "name": f"User {user_id}"}

# Concurrent execution
async def main():
    users = await asyncio.gather(
        fetch_user(1),
        fetch_user(2),
        fetch_user(3)
    )
    print(users)

Erlang:

% Spawned process
fetch_user(UserId) ->
    timer:sleep(100),  % Simulate I/O
    #{id => UserId, name => iolist_to_binary(io_lib:format("User ~p", [UserId]))}.

% Concurrent execution with processes
main() ->
    Self = self(),
    spawn(fun() -> Self ! {user, 1, fetch_user(1)} end),
    spawn(fun() -> Self ! {user, 2, fetch_user(2)} end),
    spawn(fun() -> Self ! {user, 3, fetch_user(3)} end),

    % Collect results
    Users = [receive {user, Id, User} -> User end || Id <- [1, 2, 3]],
    io:format("~p~n", [Users]).

% Or use a process pool pattern
main() ->
    Tasks = [1, 2, 3],
    Results = pmap(fun fetch_user/1, Tasks),
    io:format("~p~n", [Results]).

pmap(Fun, List) ->
    Parent = self(),
    Pids = [spawn(fun() -> Parent ! {self(), Fun(X)} end) || X <- List],
    [receive {Pid, Result} -> Result end || Pid <- Pids].

Why this translation:

  • Python's async/await creates coroutines; Erlang uses lightweight processes
  • Erlang processes are true concurrency primitives (can run on different cores)
  • Message passing replaces async callbacks
  • No event loop needed - BEAM VM handles scheduling
  • Processes are isolated and fault-tolerant

Pattern 8: Exception Handling → Let It Crash

Python:

# Defensive exception handling
def process_data(data):
    try:
        validate(data)
        transformed = transform(data)
        save(transformed)
        return {"status": "success"}
    except ValueError as e:
        return {"status": "error", "message": str(e)}
    except Exception as e:
        logger.error(f"Unexpected error: {e}")
        return {"status": "error", "message": "Internal error"}

Erlang:

% Let it crash - supervisor will restart
process_data(Data) ->
    validate(Data),      % Crash if invalid
    Transformed = transform(Data),  % Crash if transform fails
    save(Transformed),   % Crash if save fails
    {status, success}.

% Supervisor tree handles failures
-module(my_supervisor).
-behaviour(supervisor).

init([]) ->
    SupFlags = #{
        strategy => one_for_one,
        intensity => 5,
        period => 60
    },
    ChildSpecs = [
        #{
            id => worker,
            start => {worker_module, start_link, []},
            restart => permanent,
            shutdown => 5000,
            type => worker
        }
    ],
    {ok, {SupFlags, ChildSpecs}}.

% Only catch errors at supervision boundaries
handle_request(Data) ->
    try
        process_data(Data)
    catch
        error:Reason -> {error, Reason};
        exit:Reason -> {error, {exit, Reason}}
    end.

Why this translation:

  • Python: defensive programming with try/except everywhere
  • Erlang: let processes crash, supervisors restart them
  • Errors are expected; design for failure, not defensive coding
  • Try/catch only at supervision boundaries or API boundaries
  • Supervision trees provide fault isolation and recovery

Pattern 9: Class Inheritance → Behavior + gen_server

Python:

# Class hierarchy
class Animal:
    def __init__(self, name):
        self.name = name

    def speak(self):
        raise NotImplementedError

class Dog(Animal):
    def speak(self):
        return f"{self.name} says Woof!"

# Usage
dog = Dog("Buddy")
print(dog.speak())

Erlang:

% Behavior module (defines interface)
-module(animal).
-callback speak(Name :: binary()) -> binary().

% Implementation module
-module(dog).
-behaviour(animal).
-export([speak/1]).

speak(Name) ->
    <<Name/binary, " says Woof!">>.

% Or use gen_server for stateful objects
-module(dog_server).
-behaviour(gen_server).
-export([start_link/1, speak/1]).
-export([init/1, handle_call/3, handle_cast/2]).

start_link(Name) ->
    gen_server:start_link(?MODULE, Name, []).

speak(Pid) ->
    gen_server:call(Pid, speak).

init(Name) ->
    {ok, #{name => Name}}.

handle_call(speak, _From, #{name := Name} = State) ->
    Reply = <<Name/binary, " says Woof!">>,
    {reply, Reply, State}.

handle_cast(_Msg, State) ->
    {noreply, State}.

Why this translation:

  • Erlang has no inheritance; use behaviors for contracts
  • Behaviors define callbacks that modules must implement
  • For stateful objects, use gen_server (OTP behavior)
  • Composition over inheritance is enforced
  • Process-based objects can be supervised

Pattern 10: Decorators → Parse Transforms or Wrapper Functions

Python:

# Function decorator
@cache
def expensive_func(x):
    return compute(x)

# Property decorator
class Circle:
    @property
    def area(self):
        return 3.14159 * self.radius ** 2

Erlang:

% No direct decorator equivalent - use wrapper functions
expensive_func(X) ->
    case cache:get(X) of
        {ok, Result} -> Result;
        error ->
            Result = compute(X),
            cache:put(X, Result),
            Result
    end.

% Records don't have methods - use functions
-record(circle, {radius}).

area(#circle{radius = R}) ->
    3.14159 * R * R.

% Or use maps with computed access
circle_area(#{radius := R}) ->
    3.14159 * R * R.

% Parse transforms for compile-time metaprogramming (advanced)
% Similar to decorators but requires compiler hooks

Why this translation:

  • Erlang has no decorators; use explicit wrapper functions
  • Parse transforms can modify AST at compile time (advanced, rarely needed)
  • Functions are first-class; pass them as arguments for HOF patterns
  • Records and maps don't have methods; use module functions instead

Error Handling

Python Exceptions → Erlang Error Model

Python Erlang When to Use
raise Exception("msg") error({reason, Msg}) Internal errors, let it crash
raise ValueError("msg") error(badarg) Invalid arguments
try...except try...catch API boundaries only
try...finally try...after Resource cleanup
Exception chaining Nested tuples {error, {reason, {cause, SubReason}}}

Exception Translation

Python:

try:
    result = risky_operation()
except ValueError as e:
    handle_value_error(e)
except KeyError as e:
    handle_key_error(e)
except Exception as e:
    handle_generic_error(e)
finally:
    cleanup()

Erlang:

try
    Result = risky_operation(),
    process_result(Result)
catch
    error:badarg -> handle_badarg();
    error:{badkey, _} -> handle_badkey();
    error:Reason -> handle_generic_error(Reason);
    exit:Reason -> handle_exit(Reason)
after
    cleanup()
end.

Concurrency Patterns

Threading → Lightweight Processes

Python:

import threading

def worker(name, delay):
    time.sleep(delay)
    print(f"Worker {name} done")

threads = [
    threading.Thread(target=worker, args=(f"T{i}", 1))
    for i in range(5)
]

for t in threads:
    t.start()

for t in threads:
    t.join()

Erlang:

worker(Name, Delay) ->
    timer:sleep(Delay),
    io:format("Worker ~s done~n", [Name]).

main() ->
    Pids = [
        spawn(fun() -> worker(io_lib:format("T~p", [I]), 1000) end)
        || I <- lists:seq(1, 5)
    ],

    % Wait for all to complete (using monitors)
    [begin
        Ref = monitor(process, Pid),
        receive
            {'DOWN', Ref, process, Pid, _} -> ok
        end
     end || Pid <- Pids].

Asyncio → gen_server

Python:

import asyncio

class Counter:
    def __init__(self):
        self.count = 0

    async def increment(self):
        self.count += 1

    async def get_count(self):
        return self.count

async def main():
    counter = Counter()
    await counter.increment()
    await counter.increment()
    count = await counter.get_count()
    print(f"Count: {count}")

Erlang:

-module(counter).
-behaviour(gen_server).
-export([start_link/0, increment/0, get_count/0]).
-export([init/1, handle_call/3, handle_cast/2]).

start_link() ->
    gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).

increment() ->
    gen_server:cast(?MODULE, increment).

get_count() ->
    gen_server:call(?MODULE, get_count).

init([]) ->
    {ok, 0}.

handle_call(get_count, _From, Count) ->
    {reply, Count, Count}.

handle_cast(increment, Count) ->
    {noreply, Count + 1}.

% Usage
main() ->
    {ok, _Pid} = counter:start_link(),
    counter:increment(),
    counter:increment(),
    Count = counter:get_count(),
    io:format("Count: ~p~n", [Count]).

Common Pitfalls

  1. Treating Erlang as Object-Oriented: Erlang is functional. Don't try to recreate class hierarchies. Use behaviors, modules, and processes.

  2. Excessive Try-Catch: Don't wrap everything in try-catch. Let processes crash and use supervisors for fault recovery.

  3. String Confusion: Erlang strings are lists of integers. Use binaries (<<"text">>) for UTF-8 text in modern code.

  4. Mutable State Mindset: All data is immutable. Operations return new values. Use processes for mutable state via message passing.

  5. List Concatenation in Loops: List ++ Element creates a new list each time (O(n)). Use cons [Element | List] and reverse, or use lists:reverse/2.

  6. Ignoring Process Leaks: Spawned processes live until they exit. Always ensure processes terminate or are linked/monitored.

  7. Not Using Pattern Matching: Erlang's strength is pattern matching. Use it in function heads, case expressions, and receive clauses.

  8. Forgetting Tail Recursion: Recursive functions must be tail-recursive to avoid stack overflow. Use accumulator parameters.

  9. Maps vs Records: Records are compile-time, maps are runtime. Records are faster and type-checked, but less flexible.

  10. Process Bottlenecks: Single process handling all messages becomes a bottleneck. Design for parallelism with process pools or parallel message handling.


Tooling

Tool Purpose Notes
rebar3 Build tool Standard build tool for Erlang projects
dialyzer Static analyzer Type checking via success typing
eunit Unit testing Built-in unit test framework
common_test Integration testing OTP testing framework
PropEr Property-based testing Similar to Python's Hypothesis
meck Mocking library Mock modules for testing
observer GUI profiler Visual process/memory inspector
dbg Tracing Built-in tracing for debugging
py2erl Transpiler Experimental Python to Erlang compiler
ErlPort Python interop Call Python from Erlang

Examples

Example 1: Simple - HTTP Request Handler

Before (Python):

from flask import Flask, request, jsonify

app = Flask(__name__)

@app.route('/user/<int:user_id>', methods=['GET'])
def get_user(user_id):
    user = database.find_user(user_id)
    if user:
        return jsonify(user)
    else:
        return jsonify({"error": "Not found"}), 404

if __name__ == '__main__':
    app.run()

After (Erlang):

% Using Cowboy web server
-module(user_handler).
-export([init/2]).

init(Req0, State) ->
    UserId = cowboy_req:binding(user_id, Req0),
    case database:find_user(binary_to_integer(UserId)) of
        {ok, User} ->
            Req = cowboy_req:reply(200,
                #{<<"content-type">> => <<"application/json">>},
                jsx:encode(User),
                Req0),
            {ok, Req, State};
        error ->
            Req = cowboy_req:reply(404,
                #{<<"content-type">> => <<"application/json">>},
                jsx:encode(#{error => <<"Not found">>}),
                Req0),
            {ok, Req, State}
    end.

Example 2: Medium - Concurrent Data Processing

Before (Python):

import asyncio

async def process_item(item):
    # Simulate processing
    await asyncio.sleep(0.1)
    return item * 2

async def process_batch(items):
    tasks = [process_item(item) for item in items]
    results = await asyncio.gather(*tasks)
    return sum(results)

# Usage
items = list(range(1, 11))
result = asyncio.run(process_batch(items))
print(f"Total: {result}")

After (Erlang):

-module(batch_processor).
-export([process_batch/1]).

process_item(Item) ->
    timer:sleep(100),  % Simulate processing
    Item * 2.

process_batch(Items) ->
    Parent = self(),
    Pids = [spawn(fun() ->
                Result = process_item(Item),
                Parent ! {self(), Result}
            end) || Item <- Items],

    % Collect results
    Results = [receive {Pid, Result} -> Result end || Pid <- Pids],
    lists:sum(Results).

% Usage
% 1> Items = lists:seq(1, 10).
% 2> batch_processor:process_batch(Items).
% 110

Example 3: Complex - Stateful Worker Pool with Supervision

Before (Python):

import asyncio
from concurrent.futures import ThreadPoolExecutor
import queue

class WorkerPool:
    def __init__(self, num_workers=5):
        self.queue = queue.Queue()
        self.workers = []
        self.executor = ThreadPoolExecutor(max_workers=num_workers)

    def start(self):
        for i in range(5):
            future = self.executor.submit(self.worker, i)
            self.workers.append(future)

    def worker(self, worker_id):
        while True:
            try:
                task = self.queue.get(timeout=1)
                result = self.process_task(task)
                print(f"Worker {worker_id}: {result}")
                self.queue.task_done()
            except queue.Empty:
                continue

    def process_task(self, task):
        # Simulate work
        return task * 2

    def submit(self, task):
        self.queue.put(task)

    def shutdown(self):
        self.executor.shutdown(wait=True)

# Usage
pool = WorkerPool(num_workers=5)
pool.start()
for i in range(20):
    pool.submit(i)

After (Erlang):

%% Worker pool supervisor
-module(worker_pool_sup).
-behaviour(supervisor).
-export([start_link/1, init/1]).

start_link(PoolSize) ->
    supervisor:start_link({local, ?MODULE}, ?MODULE, PoolSize).

init(PoolSize) ->
    SupFlags = #{
        strategy => one_for_one,
        intensity => 5,
        period => 60
    },

    Workers = [
        #{
            id => {worker, N},
            start => {worker, start_link, [N]},
            restart => permanent,
            shutdown => 5000,
            type => worker
        } || N <- lists:seq(1, PoolSize)
    ],

    {ok, {SupFlags, Workers}}.

%% Worker gen_server
-module(worker).
-behaviour(gen_server).
-export([start_link/1, submit_task/2]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2]).

start_link(WorkerId) ->
    gen_server:start_link({local, list_to_atom("worker_" ++ integer_to_list(WorkerId))},
                          ?MODULE, WorkerId, []).

submit_task(WorkerPid, Task) ->
    gen_server:cast(WorkerPid, {task, Task}).

init(WorkerId) ->
    {ok, #{worker_id => WorkerId}}.

handle_cast({task, Task}, #{worker_id := WorkerId} = State) ->
    Result = process_task(Task),
    io:format("Worker ~p: ~p~n", [WorkerId, Result]),
    {noreply, State}.

handle_call(_Request, _From, State) ->
    {reply, ok, State}.

handle_info(_Info, State) ->
    {noreply, State}.

process_task(Task) ->
    Task * 2.

%% Pool manager
-module(pool_manager).
-export([start/1, submit/1]).

start(PoolSize) ->
    worker_pool_sup:start_link(PoolSize).

submit(Task) ->
    % Simple round-robin distribution
    Workers = [list_to_atom("worker_" ++ integer_to_list(N)) || N <- lists:seq(1, 5)],
    Worker = lists:nth(rand:uniform(length(Workers)), Workers),
    worker:submit_task(Worker, Task).

%% Usage
%% 1> pool_manager:start(5).
%% 2> [pool_manager:submit(I) || I <- lists:seq(1, 20)].

Key differences:

  • Python: ThreadPoolExecutor with shared queue
  • Erlang: Supervised worker processes with message passing
  • Erlang workers are fault-tolerant (supervisor restarts failed workers)
  • No shared state; each worker is an isolated process
  • Erlang's supervision tree provides automatic recovery

Limitations

Due to gaps in the lang-erlang-dev skill (5/8 pillars), external research was required for:

  • Serialization idioms: JSON encoding/decoding with jsx, term serialization
  • Build/dependency management: rebar3 usage, dependency declaration, release building
  • Metaprogramming details: Parse transforms (mentioned but limited coverage)

These areas are covered in this skill based on external documentation and community patterns.


See Also

For more examples and patterns, see:

  • meta-convert-dev - Foundational patterns with cross-language examples
  • convert-python-rust - Python → Rust conversion for performance focus
  • lang-python-dev - Python development patterns
  • lang-erlang-dev - Erlang development patterns

Cross-cutting pattern skills:

  • patterns-concurrency-dev - Process patterns, message passing, supervision across languages
  • patterns-testing-dev - Unit testing, property-based testing, mocking strategies across languages

References