| name | go-concurrency |
| description | Use when Go concurrency with goroutines, channels, and sync patterns. Use when writing concurrent Go code. |
| allowed-tools | Bash, Read |
Go Concurrency
Master Go's concurrency model using goroutines, channels, and synchronization primitives for building concurrent applications.
Goroutines
Creating goroutines:
package main
import (
"fmt"
"time"
)
func sayHello() {
fmt.Println("Hello from goroutine")
}
func main() {
// Launch goroutine
go sayHello()
// Anonymous function goroutine
go func() {
fmt.Println("Hello from anonymous goroutine")
}()
// Give goroutines time to execute
time.Sleep(time.Second)
}
Goroutines with parameters:
func printNumber(n int) {
fmt.Println(n)
}
func main() {
for i := 0; i < 10; i++ {
go printNumber(i)
}
time.Sleep(time.Second)
}
Channels
Basic channel operations:
func main() {
// Create unbuffered channel
ch := make(chan int)
// Send in goroutine (non-blocking)
go func() {
ch <- 42
}()
// Receive (blocks until value available)
value := <-ch
fmt.Println(value) // 42
}
Buffered channels:
func main() {
// Buffered channel with capacity 2
ch := make(chan string, 2)
// Can send up to 2 values without blocking
ch <- "first"
ch <- "second"
fmt.Println(<-ch) // first
fmt.Println(<-ch) // second
}
Channel direction:
// Send-only channel
func send(ch chan<- int) {
ch <- 42
}
// Receive-only channel
func receive(ch <-chan int) int {
return <-ch
}
func main() {
ch := make(chan int)
go send(ch)
value := receive(ch)
fmt.Println(value)
}
Closing channels:
func main() {
ch := make(chan int, 3)
ch <- 1
ch <- 2
ch <- 3
close(ch) // Close channel
// Receive until channel is closed
for value := range ch {
fmt.Println(value)
}
// Check if channel is closed
value, ok := <-ch
fmt.Printf("Value: %d, Open: %v\n", value, ok) // Value: 0, Open: false
}
Select Statement
Multiplexing channels:
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(time.Second)
ch1 <- "from ch1"
}()
go func() {
time.Sleep(2 * time.Second)
ch2 <- "from ch2"
}()
// Wait for both
for i := 0; i < 2; i++ {
select {
case msg1 := <-ch1:
fmt.Println(msg1)
case msg2 := <-ch2:
fmt.Println(msg2)
}
}
}
Select with default:
func main() {
ch := make(chan int, 1)
select {
case val := <-ch:
fmt.Println(val)
default:
fmt.Println("No value ready") // Executed
}
}
Select with timeout:
func main() {
ch := make(chan string)
go func() {
time.Sleep(2 * time.Second)
ch <- "result"
}()
select {
case msg := <-ch:
fmt.Println(msg)
case <-time.After(time.Second):
fmt.Println("Timeout") // Executed after 1 second
}
}
Worker Pools
Implementing worker pool pattern:
func worker(id int, jobs <-chan int, results chan<- int) {
for job := range jobs {
fmt.Printf("Worker %d processing job %d\n", id, job)
time.Sleep(time.Second)
results <- job * 2
}
}
func main() {
jobs := make(chan int, 100)
results := make(chan int, 100)
// Start 3 workers
for w := 1; w <= 3; w++ {
go worker(w, jobs, results)
}
// Send 5 jobs
for j := 1; j <= 5; j++ {
jobs <- j
}
close(jobs)
// Collect results
for a := 1; a <= 5; a++ {
<-results
}
}
sync.WaitGroup
Waiting for goroutines to complete:
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
defer wg.Done() // Decrement counter when done
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
for i := 1; i <= 5; i++ {
wg.Add(1) // Increment counter
go worker(i, &wg)
}
wg.Wait() // Wait for all to complete
fmt.Println("All workers done")
}
sync.Mutex
Protecting shared state:
import (
"fmt"
"sync"
)
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Increment() {
c.mu.Lock()
c.value++
c.mu.Unlock()
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func main() {
var wg sync.WaitGroup
counter := Counter{}
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Println(counter.Value()) // 1000
}
sync.RWMutex
Read-write locks:
type Cache struct {
mu sync.RWMutex
items map[string]string
}
func (c *Cache) Get(key string) (string, bool) {
c.mu.RLock() // Read lock
defer c.mu.RUnlock()
val, ok := c.items[key]
return val, ok
}
func (c *Cache) Set(key, value string) {
c.mu.Lock() // Write lock
defer c.mu.Unlock()
c.items[key] = value
}
func main() {
cache := Cache{items: make(map[string]string)}
// Multiple readers can access simultaneously
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
cache.Get("key")
}()
}
wg.Wait()
}
sync.Once
Execute once initialization:
var (
instance *Database
once sync.Once
)
type Database struct {
conn string
}
func GetDatabase() *Database {
once.Do(func() {
fmt.Println("Initializing database")
instance = &Database{conn: "connected"}
})
return instance
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 10; i++ {
wg.Add(1)
go func() {
defer wg.Done()
db := GetDatabase() // Only initializes once
fmt.Println(db.conn)
}()
}
wg.Wait()
}
Context Package
Using context for cancellation:
import (
"context"
"fmt"
"time"
)
func worker(ctx context.Context, id int) {
for {
select {
case <-ctx.Done():
fmt.Printf("Worker %d cancelled\n", id)
return
default:
fmt.Printf("Worker %d working\n", id)
time.Sleep(500 * time.Millisecond)
}
}
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
for i := 1; i <= 3; i++ {
go worker(ctx, i)
}
time.Sleep(2 * time.Second)
cancel() // Cancel all workers
time.Sleep(time.Second)
}
Context with timeout:
func slowOperation(ctx context.Context) error {
select {
case <-time.After(3 * time.Second):
return nil
case <-ctx.Done():
return ctx.Err()
}
}
func main() {
ctx, cancel := context.WithTimeout(
context.Background(),
2*time.Second,
)
defer cancel()
err := slowOperation(ctx)
if err != nil {
fmt.Println("Operation timed out:", err)
}
}
Context with values:
func processRequest(ctx context.Context) {
userID := ctx.Value("userID")
fmt.Println("Processing for user:", userID)
}
func main() {
ctx := context.WithValue(
context.Background(),
"userID",
"user123",
)
processRequest(ctx)
}
Error Handling in Concurrent Code
Using errgroup:
import (
"context"
"fmt"
"golang.org/x/sync/errgroup"
"time"
)
func fetchUser(ctx context.Context, id int) error {
time.Sleep(time.Second)
if id == 3 {
return fmt.Errorf("user %d not found", id)
}
fmt.Printf("Fetched user %d\n", id)
return nil
}
func main() {
g, ctx := errgroup.WithContext(context.Background())
userIDs := []int{1, 2, 3, 4, 5}
for _, id := range userIDs {
id := id // Capture loop variable
g.Go(func() error {
return fetchUser(ctx, id)
})
}
// Wait for all goroutines
if err := g.Wait(); err != nil {
fmt.Println("Error:", err)
}
}
Fan-Out Fan-In Pattern
Distributing work and collecting results:
func generator(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func merge(cs ...<-chan int) <-chan int {
out := make(chan int)
var wg sync.WaitGroup
wg.Add(len(cs))
for _, c := range cs {
go func(ch <-chan int) {
defer wg.Done()
for n := range ch {
out <- n
}
}(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
in := generator(1, 2, 3, 4, 5)
// Fan out
c1 := square(in)
c2 := square(in)
// Fan in
for n := range merge(c1, c2) {
fmt.Println(n)
}
}
When to Use This Skill
Use go-concurrency when you need to:
- Execute multiple operations concurrently
- Build concurrent servers or workers
- Implement producer-consumer patterns
- Process data streams concurrently
- Handle multiple I/O operations simultaneously
- Implement timeout and cancellation
- Coordinate multiple goroutines
- Build fan-out/fan-in pipelines
- Share state safely between goroutines
- Implement rate limiting or throttling
Best Practices
- Use channels for communication, mutexes for state
- Close channels from sender side only
- Always use WaitGroup to wait for goroutines
- Pass contexts for cancellation and deadlines
- Use buffered channels judiciously
- Protect shared state with mutexes
- Avoid goroutine leaks with proper cleanup
- Use select with default for non-blocking ops
- Prefer sync.Once for initialization
- Document goroutine ownership and lifecycle
Common Pitfalls
- Goroutine leaks (forgetting to exit)
- Race conditions from unprotected shared state
- Deadlocks from improper channel usage
- Sending on closed channels (panics)
- Not checking channel close status
- Overusing mutexes instead of channels
- Creating too many goroutines
- Forgetting to call WaitGroup.Done()
- Passing loop variables to goroutines
- Not handling context cancellation