Skilore

同期プリミティブ -- Mutex, RWMutex, Once, Pool, atomic

syncパッケージはMutex・RWMutex・Once・Poolなどの同期プリミティブを提供し、sync/atomicはロックフリーのアトミック操作を実現する。

83 分で読めます41,365 文字

同期プリミティブ -- Mutex, RWMutex, Once, Pool, atomic

syncパッケージはMutex・RWMutex・Once・Poolなどの同期プリミティブを提供し、sync/atomicはロックフリーのアトミック操作を実現する。


この章で学ぶこと

  1. Mutex / RWMutex -- 排他制御と読み書きロック
  2. sync.Once / sync.Pool -- 一度だけの初期化とオブジェクトの再利用
  3. sync/atomic -- ロックフリーなアトミック操作
  4. sync.Cond / sync.Map -- 条件変数と並行安全マップ
  5. 実践パターン -- 本番コードでの同期プリミティブ活用

前提知識

このガイドを読む前に、以下の知識があると理解が深まります:


1. Mutex

コード例 1: sync.Mutex の基本

type SafeCounter struct {
    mu sync.Mutex
    v  map[string]int
}
 
func (c *SafeCounter) Inc(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.v[key]++
}
 
func (c *SafeCounter) Value(key string) int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.v[key]
}

Mutex の内部動作

Go の Mutex は2つのモードを持つ: 通常モード(Normal mode)と飢餓モード(Starvation mode)。

通常モード (Normal mode):
  新しく到着した goroutine がロック獲得を試みる
Waiting────>Spinning────>Acquired
goroutine(CAS)Lock
→ 新規 goroutine が待機中の goroutine より先に
    ロックを取得できる(性能は良いが公平ではない)

飢餓モード (Starvation mode):
  待機時間が 1ms を超えた goroutine がいる場合に遷移
FIFO────>Acquired
QueueLock
→ 厳密な FIFO 順序でロックを付与
  → スピンなし、公平性を保証
  → 待機キューが空 or 最後の待機者の待ち時間 < 1ms で
    通常モードに戻る

コード例: Mutex で保護されたキャッシュ

type TTLCache struct {
    mu      sync.Mutex
    items   map[string]cacheItem
    ttl     time.Duration
    cleanCh chan struct{}
}
 
type cacheItem struct {
    value     interface{}
    expiresAt time.Time
}
 
func NewTTLCache(ttl time.Duration) *TTLCache {
    c := &TTLCache{
        items:   make(map[string]cacheItem),
        ttl:     ttl,
        cleanCh: make(chan struct{}),
    }
    go c.cleanup()
    return c
}
 
func (c *TTLCache) Set(key string, value interface{}) {
    c.mu.Lock()
    defer c.mu.Unlock()
    c.items[key] = cacheItem{
        value:     value,
        expiresAt: time.Now().Add(c.ttl),
    }
}
 
func (c *TTLCache) Get(key string) (interface{}, bool) {
    c.mu.Lock()
    defer c.mu.Unlock()
 
    item, ok := c.items[key]
    if !ok {
        return nil, false
    }
    if time.Now().After(item.expiresAt) {
        delete(c.items, key)
        return nil, false
    }
    return item.value, true
}
 
func (c *TTLCache) Delete(key string) {
    c.mu.Lock()
    defer c.mu.Unlock()
    delete(c.items, key)
}
 
func (c *TTLCache) Len() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return len(c.items)
}
 
func (c *TTLCache) cleanup() {
    ticker := time.NewTicker(c.ttl / 2)
    defer ticker.Stop()
 
    for {
        select {
        case <-ticker.C:
            c.mu.Lock()
            now := time.Now()
            for key, item := range c.items {
                if now.After(item.expiresAt) {
                    delete(c.items, key)
                }
            }
            c.mu.Unlock()
        case <-c.cleanCh:
            return
        }
    }
}
 
func (c *TTLCache) Close() {
    close(c.cleanCh)
}

2. RWMutex

コード例 2: sync.RWMutex

type Cache struct {
    mu   sync.RWMutex
    data map[string]string
}
 
func (c *Cache) Get(key string) (string, bool) {
    c.mu.RLock()         // 読み取りロック(複数同時可)
    defer c.mu.RUnlock()
    v, ok := c.data[key]
    return v, ok
}
 
func (c *Cache) Set(key, value string) {
    c.mu.Lock()          // 書き込みロック(排他)
    defer c.mu.Unlock()
    c.data[key] = value
}

RWMutex の詳細な動作

// RWMutex の読み取り/書き込みの動作
//
// 読み取りロック (RLock):
//   - 書き込みロックが保持されていなければ即座に取得
//   - 複数の goroutine が同時に RLock を保持可能
//   - 書き込みロックが待機中の場合、新しい RLock は待機する
//     (書き込み側の飢餓を防ぐ)
//
// 書き込みロック (Lock):
//   - 全ての RLock が解放されるまで待機
//   - 取得後は他の RLock も Lock も取得不可
 
type ConfigManager struct {
    mu     sync.RWMutex
    config map[string]string
}
 
func NewConfigManager() *ConfigManager {
    return &ConfigManager{
        config: make(map[string]string),
    }
}
 
// 読み取りは複数同時可能
func (cm *ConfigManager) Get(key string) string {
    cm.mu.RLock()
    defer cm.mu.RUnlock()
    return cm.config[key]
}
 
// GetAll は設定のスナップショットを返す
func (cm *ConfigManager) GetAll() map[string]string {
    cm.mu.RLock()
    defer cm.mu.RUnlock()
 
    // コピーを返す(ロック外での変更を防ぐ)
    snapshot := make(map[string]string, len(cm.config))
    for k, v := range cm.config {
        snapshot[k] = v
    }
    return snapshot
}
 
// 書き込みは排他
func (cm *ConfigManager) Set(key, value string) {
    cm.mu.Lock()
    defer cm.mu.Unlock()
    cm.config[key] = value
}
 
// バルク更新(トランザクション的な更新)
func (cm *ConfigManager) Update(updates map[string]string) {
    cm.mu.Lock()
    defer cm.mu.Unlock()
 
    for k, v := range updates {
        cm.config[k] = v
    }
}

RWMutex vs Mutex のベンチマーク

// ベンチマークで読み取り割合による性能差を測定
func BenchmarkMutexRead(b *testing.B) {
    var mu sync.Mutex
    data := map[string]int{"key": 42}
 
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            mu.Lock()
            _ = data["key"]
            mu.Unlock()
        }
    })
}
 
func BenchmarkRWMutexRead(b *testing.B) {
    var mu sync.RWMutex
    data := map[string]int{"key": 42}
 
    b.RunParallel(func(pb *testing.PB) {
        for pb.Next() {
            mu.RLock()
            _ = data["key"]
            mu.RUnlock()
        }
    })
}
 
// 結果の目安(8コアマシン):
// BenchmarkMutexRead-8      20000000    60 ns/op
// BenchmarkRWMutexRead-8    50000000    25 ns/op
//
// → 読み取りが95%以上の場合、RWMutexが有利
// → 書き込みが50%以上の場合、Mutexの方が良いことがある
//   (RWMutexのオーバーヘッドのため)

3. sync.Once

コード例 3: sync.Once の基本

var (
    instance *Database
    once     sync.Once
)
 
func GetDB() *Database {
    once.Do(func() {
        // 複数goroutineから呼ばれても1度だけ実行
        instance = &Database{
            conn: connectDB(),
        }
    })
    return instance
}

Go 1.21+ の OnceFunc / OnceValue / OnceValues

// Go 1.21 で追加された便利なヘルパー
 
// sync.OnceFunc: 関数を1回だけ実行するラッパーを返す
cleanup := sync.OnceFunc(func() {
    fmt.Println("クリーンアップ実行")
    db.Close()
})
 
cleanup() // "クリーンアップ実行" が出力
cleanup() // 何も起きない(2回目は実行されない)
 
// sync.OnceValue: 値を1回だけ計算するラッパーを返す
getConfig := sync.OnceValue(func() *Config {
    fmt.Println("設定読み込み中...")
    cfg, err := loadConfig("config.yaml")
    if err != nil {
        panic(err) // panicは再呼び出し時にも再送出される
    }
    return cfg
})
 
cfg := getConfig() // "設定読み込み中..." が出力
cfg = getConfig()  // キャッシュされた値を返す
 
// sync.OnceValues: 値とエラーを返す版
loadCert := sync.OnceValues(func() (*tls.Certificate, error) {
    return tls.LoadX509KeyPair("cert.pem", "key.pem")
})
 
cert, err := loadCert() // 初回: ファイルを読み込む
cert, err = loadCert()  // 2回目: キャッシュされた結果を返す

Once のエラーハンドリングパターン

// sync.Once で初期化エラーを安全に扱うパターン
type LazyDB struct {
    once sync.Once
    db   *sql.DB
    err  error
}
 
func (l *LazyDB) Get() (*sql.DB, error) {
    l.once.Do(func() {
        l.db, l.err = sql.Open("postgres", os.Getenv("DATABASE_URL"))
        if l.err != nil {
            return
        }
        l.err = l.db.Ping()
    })
    return l.db, l.err
}
 
// 注意: Once.Do は panic しても「完了」とみなす
// エラーの場合はリトライしたければ別のアプローチが必要
 
// リトライ可能な初期化(sync.Once は使えない)
type RetryableInit struct {
    mu       sync.Mutex
    db       *sql.DB
    initDone bool
}
 
func (r *RetryableInit) Get() (*sql.DB, error) {
    r.mu.Lock()
    defer r.mu.Unlock()
 
    if r.initDone {
        return r.db, nil
    }
 
    db, err := sql.Open("postgres", os.Getenv("DATABASE_URL"))
    if err != nil {
        return nil, err // 次回の呼び出しで再試行可能
    }
 
    if err := db.Ping(); err != nil {
        db.Close()
        return nil, err // 次回の呼び出しで再試行可能
    }
 
    r.db = db
    r.initDone = true
    return r.db, nil
}

4. sync.Pool

コード例 4: sync.Pool の基本

var bufPool = sync.Pool{
    New: func() interface{} {
        return new(bytes.Buffer)
    },
}
 
func processRequest(data []byte) string {
    buf := bufPool.Get().(*bytes.Buffer)
    defer func() {
        buf.Reset()
        bufPool.Put(buf) // プールに返却
    }()
 
    buf.Write(data)
    return buf.String()
}

sync.Pool の実践的な使用例

// JSON エンコーダのプール
var encoderPool = sync.Pool{
    New: func() interface{} {
        return &bytes.Buffer{}
    },
}
 
func respondJSON(w http.ResponseWriter, data interface{}) error {
    buf := encoderPool.Get().(*bytes.Buffer)
    defer func() {
        buf.Reset()
        encoderPool.Put(buf)
    }()
 
    if err := json.NewEncoder(buf).Encode(data); err != nil {
        return err
    }
 
    w.Header().Set("Content-Type", "application/json")
    _, err := w.Write(buf.Bytes())
    return err
}
 
// スライスのプール(固定サイズバッファ)
var slicePool = sync.Pool{
    New: func() interface{} {
        s := make([]byte, 0, 4096)
        return &s
    },
}
 
func processData(input []byte) []byte {
    bufPtr := slicePool.Get().(*[]byte)
    buf := (*bufPtr)[:0] // 長さをリセット、容量は保持
    defer func() {
        *bufPtr = buf[:0]
        slicePool.Put(bufPtr)
    }()
 
    // buf を使って処理
    buf = append(buf, input...)
    // ... 処理 ...
 
    // 結果をコピーして返す(プール外に持ち出す場合はコピー必須)
    result := make([]byte, len(buf))
    copy(result, buf)
    return result
}
 
// sync.Pool を使ったログフォーマッタ
type LogFormatter struct {
    pool sync.Pool
}
 
func NewLogFormatter() *LogFormatter {
    return &LogFormatter{
        pool: sync.Pool{
            New: func() interface{} {
                return &strings.Builder{}
            },
        },
    }
}
 
func (f *LogFormatter) Format(level, msg string, fields map[string]interface{}) string {
    sb := f.pool.Get().(*strings.Builder)
    defer func() {
        sb.Reset()
        f.pool.Put(sb)
    }()
 
    sb.WriteString(time.Now().Format(time.RFC3339))
    sb.WriteString(" [")
    sb.WriteString(level)
    sb.WriteString("] ")
    sb.WriteString(msg)
 
    for k, v := range fields {
        sb.WriteString(" ")
        sb.WriteString(k)
        sb.WriteString("=")
        fmt.Fprintf(sb, "%v", v)
    }
 
    return sb.String()
}

sync.Pool の注意点

sync.Pool の特性
✅ 適切な使い方:
- 頻繁に割り当て・解放される一時オブジェクト
- バッファ、エンコーダ、フォーマッタ
- GC負荷の軽減が目的
❌ 不適切な使い方:
- キャッシュとして使う
→ GCで予告なく回収される
- 接続プール(DB接続、HTTP接続)
→ 接続の状態管理ができない
- 長寿命のオブジェクト
→ Pool の目的に反する
⚠️ 注意事項:
- Pool.Get() の戻り値は必ず初期化してから使う
- Pool.Put() の前にオブジェクトをリセットする
- Pool から取得したオブジェクトを外に持ち出さない
- ベンチマークで効果を確認してから導入する

5. sync/atomic

コード例 5: atomic の基本(Go 1.19+ 型安全API)

type AtomicCounter struct {
    count atomic.Int64  // Go 1.19+
}
 
func (c *AtomicCounter) Inc() {
    c.count.Add(1)
}
 
func (c *AtomicCounter) Value() int64 {
    return c.count.Load()
}
 
// atomic.Value で任意の値を安全に読み書き
var config atomic.Value // *Config
 
func UpdateConfig(cfg *Config) {
    config.Store(cfg)
}
 
func GetConfig() *Config {
    return config.Load().(*Config)
}

atomic の詳細な使い方

// Go 1.19+ の型安全な atomic 型
type Metrics struct {
    RequestCount  atomic.Int64
    ErrorCount    atomic.Int64
    ActiveConns   atomic.Int32
    BytesReceived atomic.Uint64
    IsHealthy     atomic.Bool
}
 
func (m *Metrics) RecordRequest(success bool, bytes uint64) {
    m.RequestCount.Add(1)
    m.BytesReceived.Add(bytes)
    if !success {
        m.ErrorCount.Add(1)
    }
}
 
func (m *Metrics) ConnectionOpened() {
    m.ActiveConns.Add(1)
}
 
func (m *Metrics) ConnectionClosed() {
    m.ActiveConns.Add(-1)
}
 
func (m *Metrics) Snapshot() map[string]interface{} {
    return map[string]interface{}{
        "requests":       m.RequestCount.Load(),
        "errors":         m.ErrorCount.Load(),
        "active_conns":   m.ActiveConns.Load(),
        "bytes_received": m.BytesReceived.Load(),
        "healthy":        m.IsHealthy.Load(),
    }
}

Compare-And-Swap (CAS) パターン

// CAS を使ったロックフリーなスタック
type LockFreeStack struct {
    top atomic.Pointer[node]
}
 
type node struct {
    value int
    next  *node
}
 
func (s *LockFreeStack) Push(value int) {
    newNode := &node{value: value}
    for {
        oldTop := s.top.Load()
        newNode.next = oldTop
        // CAS: top が oldTop のままなら newNode に置き換え
        if s.top.CompareAndSwap(oldTop, newNode) {
            return
        }
        // 失敗した場合はリトライ(他の goroutine が先に変更した)
    }
}
 
func (s *LockFreeStack) Pop() (int, bool) {
    for {
        oldTop := s.top.Load()
        if oldTop == nil {
            return 0, false
        }
        // CAS: top が oldTop のままなら next に置き換え
        if s.top.CompareAndSwap(oldTop, oldTop.next) {
            return oldTop.value, true
        }
        // 失敗した場合はリトライ
    }
}
 
// atomic.Value による設定のホットリロード
type HotConfig struct {
    value atomic.Value
}
 
func NewHotConfig(initial *AppConfig) *HotConfig {
    hc := &HotConfig{}
    hc.value.Store(initial)
    return hc
}
 
func (hc *HotConfig) Get() *AppConfig {
    return hc.value.Load().(*AppConfig)
}
 
func (hc *HotConfig) Reload(newConfig *AppConfig) {
    hc.value.Store(newConfig)
    // 読み取り側は次の Load() で新しい設定を取得
    // ロック不要、読み取りは常にノンブロッキング
}
 
// ファイル監視と組み合わせたホットリロード
func (hc *HotConfig) WatchFile(ctx context.Context, path string) error {
    ticker := time.NewTicker(5 * time.Second)
    defer ticker.Stop()
 
    var lastModified time.Time
 
    for {
        select {
        case <-ticker.C:
            info, err := os.Stat(path)
            if err != nil {
                log.Printf("設定ファイル確認エラー: %v", err)
                continue
            }
 
            if info.ModTime().After(lastModified) {
                cfg, err := loadAppConfig(path)
                if err != nil {
                    log.Printf("設定読み込みエラー: %v", err)
                    continue
                }
                hc.Reload(cfg)
                lastModified = info.ModTime()
                log.Printf("設定リロード完了: %s", path)
            }
 
        case <-ctx.Done():
            return ctx.Err()
        }
    }
}

6. sync.Map

コード例 6: sync.Map の基本

var cache sync.Map
 
func main() {
    // Store
    cache.Store("key1", "value1")
    cache.Store("key2", "value2")
 
    // Load
    if v, ok := cache.Load("key1"); ok {
        fmt.Println(v.(string))
    }
 
    // LoadOrStore: 既存なら取得、なければ格納
    actual, loaded := cache.LoadOrStore("key3", "value3")
    fmt.Println(actual, loaded) // "value3" false
 
    // Range: 全要素を走査
    cache.Range(func(key, value any) bool {
        fmt.Printf("%s: %s\n", key, value)
        return true // falseで中断
    })
}

sync.Map が適するケースと適さないケース

// sync.Map が適するケース:
//
// 1. キーが安定している(追加はあるが削除は少ない)
// 2. 読み取りが圧倒的に多い
// 3. キーごとにアクセスするgoroutineが異なる(キーの分散)
 
// ケース1: ルーティングテーブル(起動時に設定、ランタイムで読み取り)
var routeHandlers sync.Map
 
func registerRoute(pattern string, handler http.Handler) {
    routeHandlers.Store(pattern, handler)
}
 
func findHandler(pattern string) (http.Handler, bool) {
    v, ok := routeHandlers.Load(pattern)
    if !ok {
        return nil, false
    }
    return v.(http.Handler), true
}
 
// ケース2: goroutine-local なストレージ
var goroutineData sync.Map
 
func processWithID(id int) {
    goroutineData.Store(id, &ProcessState{
        StartTime: time.Now(),
    })
    defer goroutineData.Delete(id)
 
    // 処理...
}
 
// sync.Map が適さないケース → map + RWMutex を使う
//
// 1. 頻繁な書き込みがある
// 2. 全要素の走査(Range)が頻繁
// 3. 型安全性が必要
 
// 型安全な汎用マップ(ジェネリクス + RWMutex)
type SafeMap[K comparable, V any] struct {
    mu   sync.RWMutex
    data map[K]V
}
 
func NewSafeMap[K comparable, V any]() *SafeMap[K, V] {
    return &SafeMap[K, V]{
        data: make(map[K]V),
    }
}
 
func (m *SafeMap[K, V]) Get(key K) (V, bool) {
    m.mu.RLock()
    defer m.mu.RUnlock()
    v, ok := m.data[key]
    return v, ok
}
 
func (m *SafeMap[K, V]) Set(key K, value V) {
    m.mu.Lock()
    defer m.mu.Unlock()
    m.data[key] = value
}
 
func (m *SafeMap[K, V]) Delete(key K) {
    m.mu.Lock()
    defer m.mu.Unlock()
    delete(m.data, key)
}
 
func (m *SafeMap[K, V]) Len() int {
    m.mu.RLock()
    defer m.mu.RUnlock()
    return len(m.data)
}
 
func (m *SafeMap[K, V]) Range(fn func(K, V) bool) {
    m.mu.RLock()
    defer m.mu.RUnlock()
    for k, v := range m.data {
        if !fn(k, v) {
            break
        }
    }
}

7. sync.Cond

条件変数の使い方

// sync.Cond は条件が満たされるまで goroutine を待機させる
// チャネルでは実現しにくい「ブロードキャスト通知」が可能
 
type BoundedQueue struct {
    mu       sync.Mutex
    notEmpty *sync.Cond
    notFull  *sync.Cond
    items    []interface{}
    maxSize  int
}
 
func NewBoundedQueue(maxSize int) *BoundedQueue {
    q := &BoundedQueue{
        items:   make([]interface{}, 0, maxSize),
        maxSize: maxSize,
    }
    q.notEmpty = sync.NewCond(&q.mu)
    q.notFull = sync.NewCond(&q.mu)
    return q
}
 
func (q *BoundedQueue) Put(item interface{}) {
    q.mu.Lock()
    defer q.mu.Unlock()
 
    // キューが満杯の間は待機
    for len(q.items) >= q.maxSize {
        q.notFull.Wait() // mu.Unlock() → 待機 → mu.Lock()
    }
 
    q.items = append(q.items, item)
    q.notEmpty.Signal() // 1つの待機goroutineを起こす
}
 
func (q *BoundedQueue) Take() interface{} {
    q.mu.Lock()
    defer q.mu.Unlock()
 
    // キューが空の間は待機
    for len(q.items) == 0 {
        q.notEmpty.Wait()
    }
 
    item := q.items[0]
    q.items = q.items[1:]
    q.notFull.Signal()
    return item
}
 
// Broadcast の使用例: 全待機者への通知
type ReadyGate struct {
    mu    sync.Mutex
    cond  *sync.Cond
    ready bool
}
 
func NewReadyGate() *ReadyGate {
    g := &ReadyGate{}
    g.cond = sync.NewCond(&g.mu)
    return g
}
 
func (g *ReadyGate) Wait() {
    g.mu.Lock()
    defer g.mu.Unlock()
    for !g.ready {
        g.cond.Wait()
    }
}
 
func (g *ReadyGate) Open() {
    g.mu.Lock()
    defer g.mu.Unlock()
    g.ready = true
    g.cond.Broadcast() // 全ての待機 goroutine を起こす
}
 
// 使用例
func main() {
    gate := NewReadyGate()
 
    // 複数のワーカーがゲートの開放を待つ
    for i := 0; i < 10; i++ {
        go func(id int) {
            gate.Wait()
            fmt.Printf("Worker %d: started\n", id)
        }(i)
    }
 
    time.Sleep(time.Second)
    fmt.Println("Opening gate...")
    gate.Open() // 全ワーカーが一斉に開始
}

8. sync.WaitGroup の高度な使い方

// WaitGroup + セマフォ: 同時実行数制限付き並行処理
func processWithLimit(items []Item, maxConcurrency int) error {
    var wg sync.WaitGroup
    sem := make(chan struct{}, maxConcurrency)
    errCh := make(chan error, len(items))
 
    for _, item := range items {
        wg.Add(1)
        go func(it Item) {
            defer wg.Done()
 
            sem <- struct{}{}        // セマフォ取得
            defer func() { <-sem }() // セマフォ解放
 
            if err := process(it); err != nil {
                errCh <- err
            }
        }(item)
    }
 
    wg.Wait()
    close(errCh)
 
    // エラーの集約
    var errs []error
    for err := range errCh {
        errs = append(errs, err)
    }
    if len(errs) > 0 {
        return fmt.Errorf("処理エラー %d件: %v", len(errs), errs[0])
    }
    return nil
}
 
// WaitGroup + 進捗報告
type ProgressTracker struct {
    total     int
    completed atomic.Int64
    wg        sync.WaitGroup
}
 
func NewProgressTracker(total int) *ProgressTracker {
    pt := &ProgressTracker{total: total}
    pt.wg.Add(total)
    return pt
}
 
func (pt *ProgressTracker) Done() {
    pt.completed.Add(1)
    pt.wg.Done()
}
 
func (pt *ProgressTracker) Wait() {
    pt.wg.Wait()
}
 
func (pt *ProgressTracker) Progress() float64 {
    return float64(pt.completed.Load()) / float64(pt.total) * 100
}
 
// 使用例
func processFiles(files []string) {
    pt := NewProgressTracker(len(files))
 
    // 進捗報告 goroutine
    go func() {
        ticker := time.NewTicker(time.Second)
        defer ticker.Stop()
        for {
            select {
            case <-ticker.C:
                fmt.Printf("進捗: %.1f%%\n", pt.Progress())
                if pt.Progress() >= 100 {
                    return
                }
            }
        }
    }()
 
    for _, file := range files {
        go func(f string) {
            defer pt.Done()
            processFile(f)
        }(file)
    }
 
    pt.Wait()
    fmt.Println("全ファイル処理完了")
}

9. 実践パターン: 複数の同期プリミティブの組み合わせ

パターン1: Sharded Map(シャーディングマップ)

// 大量のキーを持つ並行マップの性能を向上させるシャーディング
const numShards = 32
 
type ShardedMap[V any] struct {
    shards [numShards]struct {
        mu    sync.RWMutex
        items map[string]V
    }
}
 
func NewShardedMap[V any]() *ShardedMap[V] {
    sm := &ShardedMap[V]{}
    for i := range sm.shards {
        sm.shards[i].items = make(map[string]V)
    }
    return sm
}
 
func (sm *ShardedMap[V]) shard(key string) int {
    h := fnv.New32a()
    h.Write([]byte(key))
    return int(h.Sum32()) % numShards
}
 
func (sm *ShardedMap[V]) Get(key string) (V, bool) {
    s := &sm.shards[sm.shard(key)]
    s.mu.RLock()
    defer s.mu.RUnlock()
    v, ok := s.items[key]
    return v, ok
}
 
func (sm *ShardedMap[V]) Set(key string, value V) {
    s := &sm.shards[sm.shard(key)]
    s.mu.Lock()
    defer s.mu.Unlock()
    s.items[key] = value
}
 
func (sm *ShardedMap[V]) Delete(key string) {
    s := &sm.shards[sm.shard(key)]
    s.mu.Lock()
    defer s.mu.Unlock()
    delete(s.items, key)
}
 
func (sm *ShardedMap[V]) Len() int {
    total := 0
    for i := range sm.shards {
        sm.shards[i].mu.RLock()
        total += len(sm.shards[i].items)
        sm.shards[i].mu.RUnlock()
    }
    return total
}

パターン2: Singleton with Lazy Init

// ジェネリクスを使った汎用 Singleton パターン
type Singleton[T any] struct {
    once     sync.Once
    value    T
    initFunc func() T
}
 
func NewSingletonT any T) *Singleton[T] {
    return &Singleton[T]{initFunc: init}
}
 
func (s *Singleton[T]) Get() T {
    s.once.Do(func() {
        s.value = s.initFunc()
    })
    return s.value
}
 
// 使用例
var dbSingleton = NewSingleton(func() *sql.DB {
    db, err := sql.Open("postgres", os.Getenv("DATABASE_URL"))
    if err != nil {
        log.Fatal(err)
    }
    db.SetMaxOpenConns(25)
    db.SetMaxIdleConns(5)
    return db
})
 
func handler(w http.ResponseWriter, r *http.Request) {
    db := dbSingleton.Get()
    // db を使用...
}

パターン3: Rate Limiter (Token Bucket)

// atomic を使ったトークンバケットレートリミッタ
type TokenBucket struct {
    tokens     atomic.Int64
    maxTokens  int64
    refillRate int64 // 1秒あたりの補充数
    lastRefill atomic.Int64
}
 
func NewTokenBucket(maxTokens, refillRate int64) *TokenBucket {
    tb := &TokenBucket{
        maxTokens:  maxTokens,
        refillRate: refillRate,
    }
    tb.tokens.Store(maxTokens)
    tb.lastRefill.Store(time.Now().UnixNano())
    return tb
}
 
func (tb *TokenBucket) refill() {
    now := time.Now().UnixNano()
    last := tb.lastRefill.Load()
    elapsed := float64(now-last) / float64(time.Second)
 
    if elapsed < 0.001 { // 1ms未満は無視
        return
    }
 
    if tb.lastRefill.CompareAndSwap(last, now) {
        newTokens := int64(elapsed * float64(tb.refillRate))
        if newTokens > 0 {
            current := tb.tokens.Load()
            updated := current + newTokens
            if updated > tb.maxTokens {
                updated = tb.maxTokens
            }
            tb.tokens.Store(updated)
        }
    }
}
 
func (tb *TokenBucket) Allow() bool {
    tb.refill()
    for {
        current := tb.tokens.Load()
        if current <= 0 {
            return false
        }
        if tb.tokens.CompareAndSwap(current, current-1) {
            return true
        }
    }
}

10. ASCII図解

図1: Mutex vs RWMutex

Mutex (sync.Mutex):
  G1: [===Lock===]
  G2:             [===Lock===]
  G3:                         [===Lock===]
  → 全アクセスが直列化

RWMutex (sync.RWMutex):
  G1(R): [==RLock==]
  G2(R): [==RLock==]  ← 読み取り同時可
  G3(R): [==RLock==]
  G4(W):              [===Lock===]  ← 書き込みは排他
  G5(R):                           [==RLock==]

  → 読み取りが多い場合にスループット向上

RWMutex の書き込み飢餓防止:
  G1(R): [==RLock==]
  G4(W):              待機 → [===Lock===]
  G5(R):              待機──────────────>[==RLock==]
  → G4(W) が待機中に到着した G5(R) も待機させる

図2: sync.Pool のライフサイクル

sync.Pool
┌──────┐ ┌──────┐ ┌──────┐
buf1buf2buf3プール
└──┬───┘ └──────┘ └──────┘
Get() ───> buf1 を取得
(プール空なら New() 呼出)
Put(buf1) ──> buf1 をプールに返却
※ GC時にプール内のオブジェクトは
回収される可能性がある
Pool の内部構造:
P0 (Processor)
┌─────────────┐ ┌──────────────┐
privateshared
(1つだけ)(ロックフリー)
┌───┐┌───┐┌───┐
bufbufbuf
└───┘└───┘└───┘
└─────────────┘ └──────────────┘
Put: private(空なら)→ shared

図3: atomic操作 vs Mutex

atomic.Add:
  CPU命令レベルで不可分操作
  ┌─────┐
  │ CAS │  Compare-And-Swap
  │命令  │  1命令で読み取り+比較+書き込み
  └─────┘
→ ロック不要、最速

Mutex:
操作
Unlock()
→ コンテキストスイッチのオーバーヘッド

性能比較(目安):
  atomic.Int64.Add:     ~5ns/op
  sync.Mutex + 操作:    ~25ns/op  (競合なし)
  sync.Mutex + 操作:    ~100ns/op (高競合)
  sync.RWMutex (Read):  ~15ns/op  (競合なし)

図4: ShardedMap のアーキテクチャ

キー "user:123" → hash → shard 7
ShardedMap
Shard[0] Shard[1] ... Shard[31]
┌───────┐ ┌───────┐ ┌───────┐
RWMutexRWMutexRWMutex
┌─────┐┌─────┐┌─────┐
mapmapmap
└─────┘└─────┘└─────┘
└───────┘ └───────┘ └───────┘
→ 各シャードが独立したロックを持つ
→ 異なるシャードへのアクセスは並行可能
→ 32シャードで理論上32倍のスループット

11. 比較表

表1: 同期プリミティブの選択指針

プリミティブ 用途 コスト スレッドセーフ
sync.Mutex 単純な排他制御 はい
sync.RWMutex 読み多・書き少 はい
sync.Once 1回だけ初期化 はい
sync.OnceValue 1回だけ計算(値返却) はい
sync.Pool オブジェクト再利用 はい
sync.Map 特定パターンのmap はい
sync.Cond 条件待ち・ブロードキャスト はい
atomic.Int64 単純なカウンタ 最低 はい
atomic.Value 任意の値のアトミック読み書き はい
atomic.Pointer ポインタのCAS操作 はい
channel データの所有権移転 中〜高 はい

表2: sync.Map vs map+Mutex vs ShardedMap

項目 sync.Map map + RWMutex ShardedMap
読み取り性能 非常に高速 高速 非常に高速
書き込み性能 低〜中 高速
適する場面 キーが安定、読み取り主体 頻繁な書き込み 大量キー、高並行
型安全性 any (型アサーション必要) ジェネリクスで型安全 ジェネリクスで型安全
GC負荷 やや高い 低い 低い
実装の複雑さ 最も簡単 簡単 中程度
推奨度 限定的な場面 一般的に推奨 高性能が必要な場面

表3: ロック取得の戦略比較

戦略 仕組み CPU使用 レイテンシ 用途
スピンロック ループで繰り返しCAS 高い 低い 短時間のロック保持
Mutex (Go) スピン→セマフォ 適応的 汎用
チャネル ランタイムスケジューラ 低い 中〜高 メッセージパッシング
atomic CAS CPU命令1つ 最低 最低 単一変数の更新

12. アンチパターン

アンチパターン 1: Mutexのコピー

// BAD: Mutexを含む構造体をコピー
type Counter struct {
    mu sync.Mutex
    n  int
}
 
func (c Counter) Value() int { // 値レシーバ → コピーされる!
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.n
}
 
// GOOD: ポインタレシーバを使う
func (c *Counter) Value() int {
    c.mu.Lock()
    defer c.mu.Unlock()
    return c.n
}
 
// go vet で検出可能: copies lock value

アンチパターン 2: ロックの粒度が大きすぎる

// BAD: 関数全体をロック
func (s *Service) ProcessOrder(order *Order) error {
    s.mu.Lock()
    defer s.mu.Unlock()
 
    validated := validate(order)        // ロック不要な処理
    enriched := enrichData(validated)    // ロック不要な処理
    s.orders[order.ID] = enriched       // これだけロック必要
    return nil
}
 
// GOOD: 必要な箇所だけロック
func (s *Service) ProcessOrder(order *Order) error {
    validated := validate(order)
    enriched := enrichData(validated)
 
    s.mu.Lock()
    s.orders[order.ID] = enriched
    s.mu.Unlock()
    return nil
}

アンチパターン 3: デッドロック

// BAD: ロック順序の不一致によるデッドロック
func transfer(from, to *Account, amount int) {
    from.mu.Lock()   // goroutine1: A → B の順
    to.mu.Lock()     // goroutine2: B → A の順(デッドロック!)
    // ...
}
 
// GOOD: ロック順序を統一する
func transfer(from, to *Account, amount int) {
    // ID の小さい方を先にロック(一貫した順序)
    first, second := from, to
    if from.ID > to.ID {
        first, second = to, from
    }
    first.mu.Lock()
    second.mu.Lock()
    defer first.mu.Unlock()
    defer second.mu.Unlock()
 
    from.Balance -= amount
    to.Balance += amount
}

アンチパターン 4: atomic と Mutex の混在

// BAD: 同じデータに atomic と Mutex を混在させる
type Counter struct {
    mu    sync.Mutex
    count int64
}
 
func (c *Counter) Inc() {
    atomic.AddInt64(&c.count, 1) // atomic で更新
}
 
func (c *Counter) Reset() {
    c.mu.Lock() // Mutex で保護
    c.count = 0
    c.mu.Unlock()
}
// → atomic と Mutex の保護が競合し、データ競合の可能性
 
// GOOD: 一貫した同期メカニズムを使う
type Counter struct {
    count atomic.Int64
}
 
func (c *Counter) Inc()       { c.count.Add(1) }
func (c *Counter) Reset()     { c.count.Store(0) }
func (c *Counter) Value() int64 { return c.count.Load() }

実践演習

演習1: 基本的な実装

以下の要件を満たすコードを実装してください。

要件:

  • 入力データの検証を行うこと
  • エラーハンドリングを適切に実装すること
  • テストコードも作成すること
# 演習1: 基本実装のテンプレート
class Exercise1:
    """基本的な実装パターンの演習"""
 
    def __init__(self):
        self.data = []
 
    def validate_input(self, value):
        """入力値の検証"""
        if value is None:
            raise ValueError("入力値がNoneです")
        return True
 
    def process(self, value):
        """データ処理のメインロジック"""
        self.validate_input(value)
        self.data.append(value)
        return self.data
 
    def get_results(self):
        """処理結果の取得"""
        return {
            'count': len(self.data),
            'data': self.data
        }
 
# テスト
def test_exercise1():
    ex = Exercise1()
    assert ex.process(1) == [1]
    assert ex.process(2) == [1, 2]
    assert ex.get_results()['count'] == 2
 
    try:
        ex.process(None)
        assert False, "例外が発生するべき"
    except ValueError:
        pass
 
    print("全テスト合格!")
 
test_exercise1()

演習2: 応用パターン

基本実装を拡張して、以下の機能を追加してください。

# 演習2: 応用パターン
from typing import List, Dict, Optional
from datetime import datetime
 
class AdvancedExercise:
    """応用パターンの演習"""
 
    def __init__(self, max_size: int = 100):
        self._items: List[Dict] = []
        self._max_size = max_size
        self._created_at = datetime.now()
 
    def add(self, key: str, value: any) -> bool:
        """アイテムの追加(サイズ制限付き)"""
        if len(self._items) >= self._max_size:
            return False
        self._items.append({
            'key': key,
            'value': value,
            'timestamp': datetime.now().isoformat()
        })
        return True
 
    def find(self, key: str) -> Optional[Dict]:
        """キーによる検索"""
        for item in reversed(self._items):
            if item['key'] == key:
                return item
        return None
 
    def remove(self, key: str) -> bool:
        """キーによる削除"""
        for i, item in enumerate(self._items):
            if item['key'] == key:
                self._items.pop(i)
                return True
        return False
 
    def stats(self) -> Dict:
        """統計情報"""
        return {
            'total_items': len(self._items),
            'max_size': self._max_size,
            'usage_percent': len(self._items) / self._max_size * 100,
            'uptime': str(datetime.now() - self._created_at)
        }
 
# テスト
def test_advanced():
    ex = AdvancedExercise(max_size=3)
    assert ex.add("a", 1) == True
    assert ex.add("b", 2) == True
    assert ex.add("c", 3) == True
    assert ex.add("d", 4) == False  # サイズ制限
    assert ex.find("b")['value'] == 2
    assert ex.remove("b") == True
    assert ex.find("b") is None
    stats = ex.stats()
    assert stats['total_items'] == 2
    print("応用テスト全合格!")
 
test_advanced()

演習3: パフォーマンス最適化

以下のコードのパフォーマンスを改善してください。

# 演習3: パフォーマンス最適化
import time
from functools import lru_cache
 
# 最適化前(O(n^2))
def slow_search(data: list, target: int) -> int:
    """非効率な検索"""
    for i in range(len(data)):
        for j in range(i + 1, len(data)):
            if data[i] + data[j] == target:
                return (i, j)
    return (-1, -1)
 
# 最適化後(O(n))
def fast_search(data: list, target: int) -> tuple:
    """ハッシュマップを使った効率的な検索"""
    seen = {}
    for i, num in enumerate(data):
        complement = target - num
        if complement in seen:
            return (seen[complement], i)
        seen[num] = i
    return (-1, -1)
 
# ベンチマーク
def benchmark():
    import random
    data = list(range(5000))
    random.shuffle(data)
    target = data[100] + data[4000]
 
    start = time.time()
    result1 = slow_search(data, target)
    slow_time = time.time() - start
 
    start = time.time()
    result2 = fast_search(data, target)
    fast_time = time.time() - start
 
    print(f"非効率版: {slow_time:.4f}秒")
    print(f"効率版:   {fast_time:.6f}秒")
    print(f"高速化率: {slow_time/fast_time:.0f}倍")
 
benchmark()

ポイント:

  • アルゴリズムの計算量を意識する
  • 適切なデータ構造を選択する
  • ベンチマークで効果を測定する

トラブルシューティング

よくあるエラーと解決策

エラー 原因 解決策
初期化エラー 設定ファイルの不備 設定ファイルのパスと形式を確認
タイムアウト ネットワーク遅延/リソース不足 タイムアウト値の調整、リトライ処理の追加
メモリ不足 データ量の増大 バッチ処理の導入、ページネーションの実装
権限エラー アクセス権限の不足 実行ユーザーの権限確認、設定の見直し
データ不整合 並行処理の競合 ロック機構の導入、トランザクション管理

デバッグの手順

  1. エラーメッセージの確認: スタックトレースを読み、発生箇所を特定する
  2. 再現手順の確立: 最小限のコードでエラーを再現する
  3. 仮説の立案: 考えられる原因をリストアップする
  4. 段階的な検証: ログ出力やデバッガを使って仮説を検証する
  5. 修正と回帰テスト: 修正後、関連する箇所のテストも実行する
# デバッグ用ユーティリティ
import logging
import traceback
from functools import wraps
 
# ロガーの設定
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s [%(levelname)s] %(name)s: %(message)s'
)
logger = logging.getLogger(__name__)
 
def debug_decorator(func):
    """関数の入出力をログ出力するデコレータ"""
    @wraps(func)
    def wrapper(*args, **kwargs):
        logger.debug(f"呼び出し: {func.__name__}(args={args}, kwargs={kwargs})")
        try:
            result = func(*args, **kwargs)
            logger.debug(f"戻り値: {func.__name__} -> {result}")
            return result
        except Exception as e:
            logger.error(f"例外発生: {func.__name__}: {e}")
            logger.error(traceback.format_exc())
            raise
    return wrapper
 
@debug_decorator
def process_data(items):
    """データ処理(デバッグ対象)"""
    if not items:
        raise ValueError("空のデータ")
    return [item * 2 for item in items]

パフォーマンス問題の診断

パフォーマンス問題が発生した場合の診断手順:

  1. ボトルネックの特定: プロファイリングツールで計測
  2. メモリ使用量の確認: メモリリークの有無をチェック
  3. I/O待ちの確認: ディスクやネットワークI/Oの状況を確認
  4. 同時接続数の確認: コネクションプールの状態を確認
問題の種類 診断ツール 対策
CPU負荷 cProfile, py-spy アルゴリズム改善、並列化
メモリリーク tracemalloc, objgraph 参照の適切な解放
I/Oボトルネック strace, iostat 非同期I/O、キャッシュ
DB遅延 EXPLAIN, slow query log インデックス、クエリ最適化

設計判断ガイド

選択基準マトリクス

技術選択を行う際の判断基準を以下にまとめます。

判断基準 重視する場合 妥協できる場合
パフォーマンス リアルタイム処理、大規模データ 管理画面、バッチ処理
保守性 長期運用、チーム開発 プロトタイプ、短期プロジェクト
スケーラビリティ 成長が見込まれるサービス 社内ツール、固定ユーザー
セキュリティ 個人情報、金融データ 公開データ、社内利用
開発速度 MVP、市場投入スピード 品質重視、ミッションクリティカル

アーキテクチャパターンの選択

アーキテクチャ選択フロー
① チーム規模は?
├─ 小規模(1-5人)→ モノリス
└─ 大規模(10人+)→ ②へ
② デプロイ頻度は?
├─ 週1回以下 → モノリス + モジュール分割
└─ 毎日/複数回 → ③へ
③ チーム間の独立性は?
├─ 高い → マイクロサービス
└─ 中程度 → モジュラーモノリス

トレードオフの分析

技術的な判断には必ずトレードオフが伴います。以下の観点で分析を行いましょう:

1. 短期 vs 長期のコスト

  • 短期的に速い方法が長期的には技術的負債になることがある
  • 逆に、過剰な設計は短期的なコストが高く、プロジェクトの遅延を招く

2. 一貫性 vs 柔軟性

  • 統一された技術スタックは学習コストが低い
  • 多様な技術の採用は適材適所が可能だが、運用コストが増加

3. 抽象化のレベル

  • 高い抽象化は再利用性が高いが、デバッグが困難になる場合がある
  • 低い抽象化は直感的だが、コードの重複が発生しやすい
# 設計判断の記録テンプレート
class ArchitectureDecisionRecord:
    """ADR (Architecture Decision Record) の作成"""
 
    def __init__(self, title: str):
        self.title = title
        self.context = ""
        self.decision = ""
        self.consequences = []
        self.alternatives = []
 
    def set_context(self, context: str):
        """背景と課題の記述"""
        self.context = context
        return self
 
    def set_decision(self, decision: str):
        """決定内容の記述"""
        self.decision = decision
        return self
 
    def add_consequence(self, consequence: str, positive: bool = True):
        """結果の追加"""
        self.consequences.append({
            'description': consequence,
            'type': 'positive' if positive else 'negative'
        })
        return self
 
    def add_alternative(self, name: str, reason_rejected: str):
        """却下した代替案の追加"""
        self.alternatives.append({
            'name': name,
            'reason_rejected': reason_rejected
        })
        return self
 
    def to_markdown(self) -> str:
        """Markdown形式で出力"""
        md = f"# ADR: {self.title}\n\n"
        md += f"## 背景\n{self.context}\n\n"
        md += f"## 決定\n{self.decision}\n\n"
        md += "## 結果\n"
        for c in self.consequences:
            icon = "✅" if c['type'] == 'positive' else "⚠️"
            md += f"- {icon} {c['description']}\n"
        md += "\n## 却下した代替案\n"
        for a in self.alternatives:
            md += f"- **{a['name']}**: {a['reason_rejected']}\n"
        return md

13. FAQ

Q1: sync.Onceのfuncがpanicしたらどうなるか?

sync.Onceは一度実行されたらpanicしても「完了」とみなす。再呼び出しされない。Go 1.21でsync.OnceFunc/sync.OnceValueが追加され、panicの再送出やエラーハンドリングが容易になった。OnceFuncはpanicが発生した場合、次の呼び出しでも同じpanicを再送出する。

Q2: sync.Poolはキャッシュとして使えるか?

使えない。sync.PoolのオブジェクトはGC時にいつでも回収される可能性がある。キャッシュにはmap+Mutexや専用ライブラリ(groupcache等)を使う。Poolは一時オブジェクトの再利用(バッファ等)に限定する。2回連続のGCでプール内のオブジェクトは全て回収される仕様である。

Q3: atomicとMutexのどちらが速いか?

単純な整数操作ならatomicが桁違いに速い(ロック不要)。ベンチマーク結果では、低競合時でatomicはMutexの5倍以上速い。ただしatomicは単一の値に対する操作に限られる。複数フィールドの整合性を保つにはMutexが必要。

Q4: sync.Cond と channel のどちらを使うべきか?

ほとんどのケースではchannelが推奨される。sync.Condが有利なのは、(1) ブロードキャスト通知が必要(チャネルのclose相当だが繰り返し可能)、(2) 複雑な条件での待機が必要(for ループ内の条件チェック)、(3) 既存のMutexベースのコードとの統合が必要、の3ケースに限られる。

Q5: go vet や -race フラグで同期の問題を検出できるか?

go vet はMutexのコピーなど静的に検出可能な問題を発見する。go test -race はデータ競合検出器を有効にし、実行時にデータ競合を検出する。ただし -race は実行されたコードパスでのみ検出するため、テストカバレッジが重要。本番では -race はパフォーマンスペナルティ(2-10倍の遅延)があるため通常は無効にする。

Q6: RWMutex で RLock 中に同じ goroutine で Lock を取ると何が起きるか?

デッドロックする。Go の RWMutex はリエントラント(再入可能)ではない。同じ goroutine が RLock を保持した状態で Lock を呼ぶと、RLock が解放されるのを待つが、同じ goroutine が保持しているため永久に待機する。これは設計上の制約であり、ロックの取得順序を注意深く管理する必要がある。


FAQ

Q1: このトピックを学ぶ上で最も重要なポイントは何ですか?

実践的な経験を積むことが最も重要です。理論だけでなく、実際にコードを書いて動作を確認することで理解が深まります。

Q2: 初心者がよく陥る間違いは何ですか?

基礎を飛ばして応用に進むことです。このガイドで説明している基本概念をしっかり理解してから、次のステップに進むことをお勧めします。

Q3: 実務ではどのように活用されていますか?

このトピックの知識は、日常的な開発業務で頻繁に活用されます。特にコードレビューやアーキテクチャ設計の際に重要になります。


まとめ

概念 要点
Mutex 排他制御の基本。defer Unlock()を常に使う
RWMutex 読み取り多の場合に性能向上。書き込み飢餓防止あり
Once 初期化を1回だけ安全に実行。Go 1.21+ で OnceValue 追加
Pool 一時オブジェクトの再利用でGC負荷低減。キャッシュではない
atomic ロックフリーの高速な値操作。CAS パターンも可能
sync.Map 特定パターン向けの並行安全map。一般的には map+RWMutex
sync.Cond 条件変数。ブロードキャスト通知が必要な場面で使う
ShardedMap 高並行環境での大量キーマップ。シャードごとに独立ロック

次に読むべきガイド


参考文献

  1. Go Standard Library: sync -- https://pkg.go.dev/sync
  2. Go Standard Library: sync/atomic -- https://pkg.go.dev/sync/atomic
  3. Go Memory Model -- https://go.dev/ref/mem
  4. Go Blog: "Introducing the Go Race Detector" -- https://go.dev/blog/race-detector
  5. Bryan C. Mills: "Rethinking Classical Concurrency Patterns" -- GopherCon 2018