ハッシュテーブル
ハッシュテーブルは「期待O(1)の探索」を実現する、実務で最も頻繁に使用されるデータ構造である。
91 分で読めます45,096 文字
ハッシュテーブル
ハッシュテーブルは「期待O(1)の探索」を実現する、実務で最も頻繁に使用されるデータ構造である。
この章で学ぶこと
- ハッシュ関数と衝突解決の仕組みを理解する
- ハッシュテーブルの性能特性を説明できる
- 実務での使い分け(dict/set/Map等)を理解する
- ハッシュテーブルのセキュリティ上の考慮点を把握する
- 各言語におけるハッシュテーブルの内部実装を理解する
- 分散システムにおけるハッシュ技法を習得する
前提知識
1. ハッシュテーブルの仕組み
1.1 基本構造
ハッシュテーブル: キー → ハッシュ関数 → インデックス → 値
キー "Alice" → hash("Alice") = 0x7A3B...
→ 0x7A3B % 8 = 3 (テーブルサイズ8)
→ table[3] = "Alice: 100" ┌─────┐
│ 0 │ → (空)
│ 1 │ → ("Bob", 85)
│ 2 │ → (空)
│ 3 │ → ("Alice", 100)
│ 4 │ → ("Charlie", 92) → ("Eve", 78) ← 衝突(チェイニング)
│ 5 │ → (空)
│ 6 │ → ("Diana", 88)
│ 7 │ → (空)
└─────┘
ハッシュテーブルの基本概念は非常にシンプルである。任意のキーをハッシュ関数で整数値に変換し、その整数値をテーブルサイズで割った余りをインデックスとして使用する。この仕組みにより、キーから値への直接的なアクセスが可能となり、期待計算量O(1)の探索が実現される。
1.2 ハッシュ関数の設計原理
良いハッシュ関数は以下の性質を満たす必要がある。
良いハッシュ関数の条件:
1. 決定性: 同じ入力に対して常に同じ出力を返す
2. 均一分布: 出力がハッシュ空間に均等に分布する
3. 効率性: 計算が高速である(O(1)に近い)
4. 雪崩効果: 入力の1ビットの変化が出力の約半数のビットを変える
主要なハッシュ関数:
Division Method:
h(k) = k mod m
mは2の冪を避ける(素数が望ましい)
例: m = 997(素数)
Multiplication Method:
h(k) = floor(m × (k × A mod 1))
A = (√5 - 1) / 2 ≈ 0.6180339887(黄金比)
mが2の冪でも良好に動作
Universal Hashing:
h(k) = ((a × k + b) mod p) mod m
a, bをランダムに選択
衝突確率が1/mに制限される
MurmurHash3:
高速な非暗号ハッシュ関数
Redis, Hadoop, Spark等で使用
SipHash:
Hash DoS耐性のあるPRF
Python 3.4+, Rust, Perl で標準使用
1.3 文字列のハッシュ
# 文字列のハッシュ関数の例
# 1. 単純な方法(均一性が低い)
def bad_hash(s, m):
"""各文字のASCIIコードの合計"""
return sum(ord(c) for c in s) % m
# "abc" と "bca" が同じハッシュ値 → アナグラムが衝突
# 2. 多項式ハッシュ(一般的)
def polynomial_hash(s, m, base=31):
"""多項式ハッシュ: s[0]*base^(n-1) + s[1]*base^(n-2) + ... + s[n-1]"""
h = 0
for c in s:
h = (h * base + ord(c)) % m
return h
# "abc" → 31^2*97 + 31*98 + 99 = 96262
# "bca" → 31^2*98 + 31*99 + 97 = 97168 → 異なるハッシュ値
# 3. Java の String.hashCode() と同等
def java_string_hash(s):
"""s[0]*31^(n-1) + s[1]*31^(n-2) + ... + s[n-1]"""
h = 0
for c in s:
h = h * 31 + ord(c)
h &= 0xFFFFFFFF # 32ビットに制限
return h
# 4. FNV-1a ハッシュ(高速・均一分布)
def fnv1a_hash(data, m):
"""FNV-1a: offset_basis XOR byte → multiply by prime"""
FNV_OFFSET = 0xcbf29ce484222325
FNV_PRIME = 0x100000001b3
h = FNV_OFFSET
for byte in data.encode():
h ^= byte
h *= FNV_PRIME
h &= 0xFFFFFFFFFFFFFFFF # 64ビットに制限
return h % m1.4 衝突解決
衝突解決の2大方式:
1. チェイニング(Separate Chaining):
→ 同じスロットにリンクリストで格納
→ 実装が単純、ロードファクターが1を超えてもOK
→ Java HashMap, Go map
2. オープンアドレス法:
→ 衝突時に別のスロットを探す
→ メモリ効率が良い、キャッシュフレンドリー
→ Python dict, Rust HashMap
探索方法:
- 線形探索: h(k)+1, h(k)+2, ...(クラスタリング問題)
- 二次探索: h(k)+1², h(k)+2², ...
- ダブルハッシュ: h(k)+i×h2(k)
ロードファクター = 要素数 / テーブルサイズ
→ チェイニング: 0.75で拡張(Java)
→ オープンアドレス: 2/3で拡張(Python)
1.5 チェイニングの詳細実装
class HashTableChaining:
"""チェイニング方式のハッシュテーブル"""
def __init__(self, capacity=16, load_factor_threshold=0.75):
self.capacity = capacity
self.load_factor_threshold = load_factor_threshold
self.size = 0
self.buckets = [[] for _ in range(capacity)]
def _hash(self, key):
"""ハッシュ値を計算"""
return hash(key) % self.capacity
def put(self, key, value):
"""キーと値のペアを挿入"""
idx = self._hash(key)
bucket = self.buckets[idx]
# 既存キーの更新
for i, (k, v) in enumerate(bucket):
if k == key:
bucket[i] = (key, value)
return
# 新規挿入
bucket.append((key, value))
self.size += 1
# ロードファクターチェック
if self.size / self.capacity > self.load_factor_threshold:
self._resize()
def get(self, key, default=None):
"""キーから値を取得"""
idx = self._hash(key)
for k, v in self.buckets[idx]:
if k == key:
return v
return default
def remove(self, key):
"""キーを削除"""
idx = self._hash(key)
bucket = self.buckets[idx]
for i, (k, v) in enumerate(bucket):
if k == key:
bucket.pop(i)
self.size -= 1
return True
return False
def __contains__(self, key):
"""in 演算子のサポート"""
return self.get(key) is not None
def _resize(self):
"""テーブルサイズを2倍に拡張"""
old_buckets = self.buckets
self.capacity *= 2
self.buckets = [[] for _ in range(self.capacity)]
self.size = 0
for bucket in old_buckets:
for key, value in bucket:
self.put(key, value)
def load_factor(self):
"""現在のロードファクターを返す"""
return self.size / self.capacity
def __len__(self):
return self.size
def __repr__(self):
items = []
for bucket in self.buckets:
for k, v in bucket:
items.append(f"{k!r}: {v!r}")
return "{" + ", ".join(items) + "}"
# 使用例
ht = HashTableChaining()
ht.put("name", "Alice")
ht.put("age", 30)
ht.put("email", "alice@example.com")
print(ht.get("name")) # "Alice"
print("age" in ht) # True
print(ht.load_factor()) # 0.1875 (3/16)
ht.remove("age")
print(len(ht)) # 21.6 オープンアドレス法の詳細実装
class HashTableOpenAddressing:
"""オープンアドレス法(線形探索)のハッシュテーブル"""
EMPTY = object() # 空スロットの番兵
DELETED = object() # 削除済みスロットの番兵(墓石)
def __init__(self, capacity=16, load_factor_threshold=0.67):
self.capacity = capacity
self.load_factor_threshold = load_factor_threshold
self.size = 0
self.keys = [self.EMPTY] * capacity
self.values = [None] * capacity
def _hash(self, key):
return hash(key) % self.capacity
def _probe(self, key):
"""線形探索でスロットを探す"""
idx = self._hash(key)
first_deleted = None
for i in range(self.capacity):
pos = (idx + i) % self.capacity
if self.keys[pos] is self.EMPTY:
# 空スロットに到達 → キーは存在しない
return (first_deleted if first_deleted is not None else pos, False)
if self.keys[pos] is self.DELETED:
if first_deleted is None:
first_deleted = pos
continue
if self.keys[pos] == key:
return (pos, True) # キーが見つかった
# テーブルが満杯(通常はリサイズで防止)
return (first_deleted if first_deleted is not None else -1, False)
def put(self, key, value):
pos, found = self._probe(key)
if found:
self.values[pos] = value # 更新
else:
self.keys[pos] = key
self.values[pos] = value
self.size += 1
if self.size / self.capacity > self.load_factor_threshold:
self._resize()
def get(self, key, default=None):
pos, found = self._probe(key)
if found:
return self.values[pos]
return default
def remove(self, key):
"""削除: 墓石(DELETED)マーカーを配置"""
pos, found = self._probe(key)
if found:
self.keys[pos] = self.DELETED
self.values[pos] = None
self.size -= 1
return True
return False
def _resize(self):
old_keys = self.keys
old_values = self.values
self.capacity *= 2
self.keys = [self.EMPTY] * self.capacity
self.values = [None] * self.capacity
self.size = 0
for k, v in zip(old_keys, old_values):
if k is not self.EMPTY and k is not self.DELETED:
self.put(k, v)
# 線形探索のクラスタリング問題の可視化
# スロット: [A][B][C][_][_][E][F][G][_][_]
# ^^^^^^^^ ^^^^^^^^
# クラスタ1 クラスタ2
# 新しい要素がクラスタ近くにハッシュされると、クラスタが成長
# → 探索時間が O(1) から O(n) に劣化1.7 Robin Hood ハッシング
class RobinHoodHashTable:
"""Robin Hood ハッシング: 挿入時に「貧しい」要素を優先する"""
EMPTY = None
def __init__(self, capacity=16):
self.capacity = capacity
self.size = 0
self.keys = [self.EMPTY] * capacity
self.values = [None] * capacity
self.distances = [0] * capacity # 理想位置からの距離
def _hash(self, key):
return hash(key) % self.capacity
def put(self, key, value):
if self.size / self.capacity > 0.8:
self._resize()
idx = self._hash(key)
dist = 0
while True:
pos = (idx + dist) % self.capacity
if self.keys[pos] is self.EMPTY:
# 空スロットに挿入
self.keys[pos] = key
self.values[pos] = value
self.distances[pos] = dist
self.size += 1
return
if self.keys[pos] == key:
# 既存キーの更新
self.values[pos] = value
return
# Robin Hood: 現在の要素より探索距離が短い場合、交換
if self.distances[pos] < dist:
# 「裕福な」要素(短い距離)を追い出す
key, self.keys[pos] = self.keys[pos], key
value, self.values[pos] = self.values[pos], value
dist, self.distances[pos] = self.distances[pos], dist
dist += 1
def get(self, key, default=None):
idx = self._hash(key)
dist = 0
while True:
pos = (idx + dist) % self.capacity
if self.keys[pos] is self.EMPTY:
return default
if self.distances[pos] < dist:
# この位置に到達する前に見つかるはず
return default
if self.keys[pos] == key:
return self.values[pos]
dist += 1
def _resize(self):
old_keys = self.keys
old_values = self.values
self.capacity *= 2
self.keys = [self.EMPTY] * self.capacity
self.values = [None] * self.capacity
self.distances = [0] * self.capacity
self.size = 0
for k, v in zip(old_keys, old_values):
if k is not self.EMPTY:
self.put(k, v)
# Robin Hood の利点:
# - 最悪ケースの探索距離が O(log log n) に改善
# - 探索距離の分散が小さい(均一に近い)
# - Rust の HashMap が採用(2015-2021、その後 hashbrown/Swiss Table に移行)1.8 計算量
ハッシュテーブルの計算量:| 操作 | 期待 | 最悪 |
|---|---|---|
| 挿入 | O(1) | O(n) |
| 検索 | O(1) | O(n) |
| 削除 | O(1) | O(n) |
| リサイズ | O(n) | O(n) |
最悪ケース O(n) の条件:
- 全てのキーが同じスロットに衝突
- 意図的な攻撃(Hash DoS)
対策:
- ランダム化ハッシュ(SipHash: Python, Rust)
- 赤黒木へのフォールバック(Java 8+ HashMap)
リサイズの償却分析:
- テーブルサイズが n のとき、リサイズコストは O(n)
- リサイズ間に少なくとも n/2 回の挿入が発生
- 償却コスト = O(n) / (n/2) = O(1)
- つまり、個々の挿入の償却計算量は O(1)
2. 各言語のハッシュテーブル内部実装
2.1 Python dict の内部実装
# Python 3.6+ の dict はコンパクト辞書(順序保持)
# 内部構造:
# - indices: ハッシュテーブル(インデックスの配列)
# - entries: (hash, key, value) のコンパクト配列
# 実装の概念図:
# indices = [None, 1, None, None, 0, None, None, 2]
# ↓ ↓ ↓
# entries = [(hash_a, "age", 30), # index 0
# (hash_n, "name", "Alice"), # index 1
# (hash_e, "email", "x@y")] # index 2
# メリット:
# 1. メモリ効率: indicesは1バイトエントリ(要素数 < 256の場合)
# 2. 順序保持: entries配列は挿入順
# 3. イテレーション高速化: entries配列を順に走査
# Python dict の特殊な最適化
d = {}
# 1. Key-Sharing辞書(同じクラスのインスタンス間でキーを共有)
class Point:
def __init__(self, x, y):
self.x = x
self.y = y
# p1.__dict__ と p2.__dict__ はキー配列を共有
# → メモリ節約(特にクラスのインスタンスが大量にある場合)
# 2. 文字列の特別扱い
# 短い文字列は intern される(同一オブジェクトを再利用)
a = "hello"
b = "hello"
print(a is b) # True(同一オブジェクト)
# 3. __hash__と__eq__のプロトコル
class CustomKey:
def __init__(self, value):
self.value = value
def __hash__(self):
return hash(self.value)
def __eq__(self, other):
return isinstance(other, CustomKey) and self.value == other.value
# ミュータブルなオブジェクトはハッシュ不可
# list, dict, set はキーにできない(TypeError)2.2 Java HashMap の内部実装
// Java 8+ HashMap の内部構造
// - 初期容量: 16
// - ロードファクター: 0.75
// - チェイニング方式
// - バケット内要素数 >= 8 で赤黒木に変換(Treeification)
// - バケット内要素数 <= 6 でリンクリストに戻す
// HashMap の put メソッドの流れ
public V put(K key, V value) {
// 1. key の hashCode() を取得
int hash = hash(key);
// 2. 上位ビットを下位ビットに混ぜる(spread)
// static int hash(Object key) {
// int h;
// return (key == null) ? 0 : (h = key.hashCode()) ^ (h >>> 16);
// }
// 3. index = hash & (capacity - 1)
// capacity が 2の冪なので、ビットANDで高速にmod
// 4. バケットに挿入
// - 空 → 新規ノード
// - リンクリスト → 線形探索で末尾に追加
// - 赤黒木 → 木に挿入
// 5. サイズ > threshold (capacity * loadFactor) ならリサイズ
return putVal(hash, key, value, false, true);
}
// Java 8 以降の Treeification(赤黒木化)
// リンクリスト O(n) → 赤黒木 O(log n)
// Hash DoS 攻撃への対策
// ConcurrentHashMap(スレッドセーフ版)
// - Java 8+: CAS + synchronized(セグメントロック廃止)
// - 読み取りはロックフリー(volatile読み取り)
// - 書き込みはバケット単位のsynchronized
import java.util.concurrent.ConcurrentHashMap;
ConcurrentHashMap<String, Integer> map = new ConcurrentHashMap<>();
map.put("key", 42);
map.computeIfAbsent("key2", k -> expensiveComputation(k));2.3 Go map の内部実装
// Go の map は runtime パッケージで実装
// - バケット方式(各バケットに8つのキー/値ペアを格納)
// - 増分リサイズ(incremental resizing)
// - ランダム化されたイテレーション順序
package main
import "fmt"
func main() {
// 基本操作
m := make(map[string]int)
m["Alice"] = 100
m["Bob"] = 85
// 存在確認(2値返し)
if v, ok := m["Alice"]; ok {
fmt.Printf("Alice: %d\n", v)
}
// 削除
delete(m, "Bob")
// イテレーション(順序は非決定的)
for k, v := range m {
fmt.Printf("%s: %d\n", k, v)
}
}
// Go map の内部構造:
// type hmap struct {
// count int // 要素数
// flags uint8
// B uint8 // バケット数 = 2^B
// noverflow uint16 // オーバーフローバケット数
// hash0 uint32 // ハッシュシード(ランダム化)
// buckets unsafe.Pointer
// oldbuckets unsafe.Pointer // リサイズ中の旧バケット
// ...
// }
// Go map はスレッドセーフではない
// 並行アクセスには sync.Map を使用
// import "sync"
// var m sync.Map
// m.Store("key", "value")
// v, ok := m.Load("key")2.4 Rust HashMap の内部実装
use std::collections::HashMap;
fn main() {
// Rust HashMap は hashbrown クレート(Swiss Table ベース)
// - SipHash-1-3 をデフォルトハッシュ関数として使用(Hash DoS 耐性)
// - SIMD を活用した高速探索
let mut map = HashMap::new();
map.insert("Alice", 100);
map.insert("Bob", 85);
// パターンマッチングで安全にアクセス
match map.get("Alice") {
Some(&score) => println!("Alice: {}", score),
None => println!("Not found"),
}
// Entry API(存在しない場合のみ挿入)
map.entry("Charlie").or_insert(90);
// Entry API(既存値の更新)
let count = map.entry("Alice").or_insert(0);
*count += 10; // Alice: 110
// イテレーション
for (key, value) in &map {
println!("{}: {}", key, value);
}
}
// カスタムハッシュ関数の使用(高速だがDoS非耐性)
// use std::collections::HashMap;
// use std::hash::BuildHasherDefault;
// use ahash::AHasher;
//
// type AHashMap<K, V> = HashMap<K, V, BuildHasherDefault<AHasher>>;
// let mut map: AHashMap<String, i32> = AHashMap::default();3. 実務での活用
3.1 各言語のハッシュテーブル
# Python: dict(最も使用頻度が高いデータ構造)
d = {"name": "Alice", "age": 30}
d["email"] = "alice@example.com" # O(1)
"name" in d # O(1)
# Python: set(重複排除、集合演算)
s = {1, 2, 3}
s.add(4) # O(1)
2 in s # O(1)
s & {2, 3, 4} # 積集合: {2, 3}
s | {4, 5} # 和集合: {1, 2, 3, 4, 5}
# Counter(頻度カウント)
from collections import Counter
words = ["apple", "banana", "apple", "cherry", "banana", "apple"]
count = Counter(words)
# Counter({'apple': 3, 'banana': 2, 'cherry': 1})
count.most_common(2) # [('apple', 3), ('banana', 2)]
# defaultdict(デフォルト値付き辞書)
from collections import defaultdict
graph = defaultdict(list)
graph["A"].append("B") # KeyError なし3.2 ハッシュテーブルの設計パターン
# パターン1: メモ化(キャッシュ)
cache = {}
def expensive_computation(key):
if key not in cache:
cache[key] = compute(key) # 初回のみ計算
return cache[key]
# functools.lru_cache を使ったメモ化
from functools import lru_cache
@lru_cache(maxsize=128)
def fibonacci(n):
if n < 2:
return n
return fibonacci(n - 1) + fibonacci(n - 2)
# パターン2: グルーピング
from collections import defaultdict
def group_anagrams(words):
groups = defaultdict(list)
for word in words:
key = tuple(sorted(word))
groups[key].append(word)
return list(groups.values())
# 使用例
print(group_anagrams(["eat", "tea", "tan", "ate", "nat", "bat"]))
# [['eat', 'tea', 'ate'], ['tan', 'nat'], ['bat']]
# パターン3: Two Sum(ハッシュマップで O(n))
def two_sum(nums, target):
seen = {}
for i, num in enumerate(nums):
complement = target - num
if complement in seen:
return [seen[complement], i]
seen[num] = i
# パターン4: スライディングウィンドウ + ハッシュマップ
def length_of_longest_substring(s):
"""重複のない最長部分文字列の長さ"""
char_index = {}
max_len = 0
start = 0
for i, c in enumerate(s):
if c in char_index and char_index[c] >= start:
start = char_index[c] + 1
char_index[c] = i
max_len = max(max_len, i - start + 1)
return max_len
# 使用例
print(length_of_longest_substring("abcabcbb")) # 3 ("abc")
print(length_of_longest_substring("bbbbb")) # 1 ("b")
print(length_of_longest_substring("pwwkew")) # 3 ("wke")
# パターン5: 頻度マップによる判定
def is_anagram(s, t):
"""2つの文字列がアナグラムかどうか"""
if len(s) != len(t):
return False
count = {}
for c in s:
count[c] = count.get(c, 0) + 1
for c in t:
count[c] = count.get(c, 0) - 1
if count[c] < 0:
return False
return True
# パターン6: ハッシュマップによるグラフの表現
def build_graph(edges):
"""エッジリストから隣接リストを構築"""
graph = defaultdict(list)
for u, v in edges:
graph[u].append(v)
graph[v].append(u)
return graph
edges = [("A", "B"), ("A", "C"), ("B", "C"), ("C", "D")]
graph = build_graph(edges)
# {'A': ['B', 'C'], 'B': ['A', 'C'], 'C': ['A', 'B', 'D'], 'D': ['C']}
# パターン7: 文字列の圧縮/展開
def word_pattern(pattern, s):
"""パターンと文字列の対応を検証(bijection)"""
words = s.split()
if len(pattern) != len(words):
return False
char_to_word = {}
word_to_char = {}
for c, w in zip(pattern, words):
if c in char_to_word:
if char_to_word[c] != w:
return False
else:
if w in word_to_char:
return False
char_to_word[c] = w
word_to_char[w] = c
return True
print(word_pattern("abba", "dog cat cat dog")) # True
print(word_pattern("abba", "dog cat cat fish")) # False3.3 JavaScript/TypeScript でのハッシュテーブル
// Map(順序保持、任意のキー型)
const map = new Map<string, number>();
map.set("Alice", 100);
map.set("Bob", 85);
console.log(map.get("Alice")); // 100
console.log(map.has("Charlie")); // false
console.log(map.size); // 2
// Object vs Map の使い分け
// Object: キーが文字列/Symbol のみ、プロトタイプチェーンあり
// Map: 任意のキー型、サイズ取得 O(1)、イテレーション順保証
// WeakMap(GC対応、キーがオブジェクトのみ)
const weakMap = new WeakMap<object, string>();
let obj = { id: 1 };
weakMap.set(obj, "metadata");
// obj への参照がなくなると、WeakMap のエントリも GC される
// Set
const set = new Set<number>([1, 2, 3, 2, 1]);
console.log(set.size); // 3(重複排除)
set.add(4);
set.delete(2);
console.log([...set]); // [1, 3, 4]
// 実務例: API レスポンスのキャッシュ
class APICache {
private cache: Map<string, { data: unknown; timestamp: number }>;
private ttlMs: number;
constructor(ttlMs: number = 60_000) {
this.cache = new Map();
this.ttlMs = ttlMs;
}
get(key: string): unknown | null {
const entry = this.cache.get(key);
if (!entry) return null;
if (Date.now() - entry.timestamp > this.ttlMs) {
this.cache.delete(key);
return null;
}
return entry.data;
}
set(key: string, data: unknown): void {
this.cache.set(key, { data, timestamp: Date.now() });
}
clear(): void {
this.cache.clear();
}
}4. LRU キャッシュの実装
4.1 OrderedDict を使った実装
from collections import OrderedDict
class LRUCache:
"""Least Recently Used キャッシュ"""
def __init__(self, capacity: int):
self.capacity = capacity
self.cache = OrderedDict()
def get(self, key):
if key not in self.cache:
return -1
# アクセスされたキーを末尾に移動(最近使用)
self.cache.move_to_end(key)
return self.cache[key]
def put(self, key, value):
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = value
if len(self.cache) > self.capacity:
# 先頭(最も古い)を削除
self.cache.popitem(last=False)
# 使用例
cache = LRUCache(3)
cache.put("a", 1)
cache.put("b", 2)
cache.put("c", 3)
print(cache.get("a")) # 1("a"が最近使用に)
cache.put("d", 4) # "b"が削除される(最も古い)
print(cache.get("b")) # -1(削除済み)4.2 ハッシュマップ + 双方向連結リストでの実装
class DLinkedNode:
"""双方向連結リストのノード"""
def __init__(self, key=0, value=0):
self.key = key
self.value = value
self.prev = None
self.next = None
class LRUCacheManual:
"""双方向連結リスト + ハッシュマップによる LRU キャッシュ"""
def __init__(self, capacity: int):
self.capacity = capacity
self.size = 0
self.cache = {} # key -> DLinkedNode
# 番兵ノード(実装を簡潔にする)
self.head = DLinkedNode() # ダミーヘッド(最近使用側)
self.tail = DLinkedNode() # ダミーテール(最も古い側)
self.head.next = self.tail
self.tail.prev = self.head
def get(self, key: int) -> int:
if key not in self.cache:
return -1
node = self.cache[key]
self._move_to_head(node)
return node.value
def put(self, key: int, value: int) -> None:
if key in self.cache:
node = self.cache[key]
node.value = value
self._move_to_head(node)
else:
node = DLinkedNode(key, value)
self.cache[key] = node
self._add_node(node)
self.size += 1
if self.size > self.capacity:
tail = self._pop_tail()
del self.cache[tail.key]
self.size -= 1
def _add_node(self, node):
"""ヘッドの直後に追加"""
node.prev = self.head
node.next = self.head.next
self.head.next.prev = node
self.head.next = node
def _remove_node(self, node):
"""ノードを連結リストから削除"""
prev = node.prev
nxt = node.next
prev.next = nxt
nxt.prev = prev
def _move_to_head(self, node):
"""ノードをヘッド直後に移動"""
self._remove_node(node)
self._add_node(node)
def _pop_tail(self):
"""テールの直前(最も古い)を削除"""
node = self.tail.prev
self._remove_node(node)
return node
# LRU キャッシュの計算量:
# get: O(1) - ハッシュマップ参照 + 連結リスト操作
# put: O(1) - ハッシュマップ挿入 + 連結リスト操作
# 空間: O(capacity)4.3 TTL(Time-To-Live)付きキャッシュ
import time
import threading
from collections import OrderedDict
class TTLCache:
"""有効期限付きキャッシュ"""
def __init__(self, capacity: int, ttl_seconds: float):
self.capacity = capacity
self.ttl = ttl_seconds
self.cache = OrderedDict() # key -> (value, expire_time)
self.lock = threading.Lock()
def get(self, key):
with self.lock:
if key not in self.cache:
return None
value, expire_time = self.cache[key]
if time.time() > expire_time:
# 期限切れ
del self.cache[key]
return None
# LRU: 最近使用に移動
self.cache.move_to_end(key)
return value
def put(self, key, value):
with self.lock:
if key in self.cache:
self.cache.move_to_end(key)
self.cache[key] = (value, time.time() + self.ttl)
# 容量制限
while len(self.cache) > self.capacity:
self.cache.popitem(last=False)
def cleanup(self):
"""期限切れエントリを一括削除"""
with self.lock:
now = time.time()
expired_keys = [
k for k, (_, exp) in self.cache.items()
if now > exp
]
for k in expired_keys:
del self.cache[k]
# 使用例
cache = TTLCache(capacity=100, ttl_seconds=300) # 5分間キャッシュ
cache.put("user:123", {"name": "Alice", "age": 30})
user = cache.get("user:123") # 5分以内なら取得可能5. 一貫性ハッシュ(Consistent Hashing)
5.1 基本概念
一貫性ハッシュ: 分散システムでのデータ分散
従来のハッシュ:
server = hash(key) % num_servers
→ サーバー追加/削除時に全データの再配置が必要
→ サーバーが3台→4台: 75%のデータが移動
一貫性ハッシュ:
サーバーとキーを同じハッシュリング上に配置
→ サーバー追加/削除時に移動するデータは1/N程度
ハッシュリング(0 ~ 2^32-1):
0
/ \
S1 S3
/ \
K1 K4
| |
K2 S2
\ /
K3 K5
\ /
S4---K6
2^32
K1 → 時計回りで最初のサーバー S1 に割り当て
K4 → S2
K5 → S2
K6 → S4
S2 がダウン → K4, K5 は S3 に移動(K1, K2, K3, K6 は影響なし)
5.2 仮想ノードを使った実装
import hashlib
from bisect import bisect_right
class ConsistentHash:
"""仮想ノード付き一貫性ハッシュ"""
def __init__(self, num_replicas=150):
self.num_replicas = num_replicas # 仮想ノード数
self.ring = {} # hash -> node
self.sorted_keys = [] # ソート済みハッシュ値
def _hash(self, key: str) -> int:
"""MD5ハッシュで均一分布を実現"""
digest = hashlib.md5(key.encode()).hexdigest()
return int(digest, 16)
def add_node(self, node: str):
"""ノード(サーバー)を追加"""
for i in range(self.num_replicas):
virtual_key = f"{node}:{i}"
h = self._hash(virtual_key)
self.ring[h] = node
self.sorted_keys.append(h)
self.sorted_keys.sort()
def remove_node(self, node: str):
"""ノードを削除"""
for i in range(self.num_replicas):
virtual_key = f"{node}:{i}"
h = self._hash(virtual_key)
del self.ring[h]
self.sorted_keys.remove(h)
def get_node(self, key: str) -> str:
"""キーが割り当てられるノードを取得"""
if not self.ring:
return None
h = self._hash(key)
idx = bisect_right(self.sorted_keys, h)
# リングの末尾を超えたら先頭に戻る
if idx == len(self.sorted_keys):
idx = 0
return self.ring[self.sorted_keys[idx]]
def get_nodes(self, key: str, n: int = 3) -> list:
"""レプリカ用: キーに対する n 個の異なるノードを取得"""
if not self.ring or n > len(set(self.ring.values())):
return list(set(self.ring.values()))
h = self._hash(key)
idx = bisect_right(self.sorted_keys, h)
result = []
seen = set()
while len(result) < n:
if idx == len(self.sorted_keys):
idx = 0
node = self.ring[self.sorted_keys[idx]]
if node not in seen:
result.append(node)
seen.add(node)
idx += 1
return result
# 使用例: キャッシュサーバーの分散
ch = ConsistentHash(num_replicas=150)
ch.add_node("cache-server-1")
ch.add_node("cache-server-2")
ch.add_node("cache-server-3")
# キーの割り当て
print(ch.get_node("user:1001")) # "cache-server-2"
print(ch.get_node("session:abc")) # "cache-server-1"
# サーバー追加(影響は最小限)
ch.add_node("cache-server-4")
print(ch.get_node("user:1001")) # 多くは同じサーバーに留まる
# レプリカの取得
print(ch.get_nodes("user:1001", n=2)) # 2つの異なるサーバー5.3 一貫性ハッシュの実用例
一貫性ハッシュの実用例:
1. Amazon DynamoDB:
- キーをパーティションに分散
- 仮想ノードでデータの偏りを緩和
- 優先リストでレプリカ先を決定
2. Apache Cassandra:
- パーティションキーのハッシュでノードを決定
- Murmur3Partitioner がデフォルト
- vnodes(仮想ノード)で負荷分散
3. Memcached / Redis Cluster:
- クライアント側で一貫性ハッシュを実装
- サーバーの追加/削除時のキャッシュミスを最小化
4. CDN(Akamai等):
- リクエストURLをハッシュしてエッジサーバーに振り分け
- サーバー障害時の影響を局所化
5. ロードバランサー:
- セッションアフィニティ(同じユーザーを同じサーバーに)
- Nginx の upstream hash モジュール
6. Hash DoS 攻撃と対策
6.1 攻撃の原理
Hash DoS(ハッシュ衝突攻撃):
原理:
- ハッシュ関数の出力が予測可能な場合
- 攻撃者が同じバケットに衝突するキーを大量に生成
- ハッシュテーブルの操作が O(1) → O(n) に劣化
- n個のキーで O(n²) の処理時間
歴史:
- 2003年: Perl のハッシュ衝突攻撃が報告
- 2011年: CCC(Chaos Communication Congress)で
PHP, Python, Ruby, Java等への攻撃が公開
- JSON/XMLリクエストに衝突するキーを大量に含めて送信
- 小さなリクエストでサーバーのCPUを100%使用可能
具体例(Python 3.2以前):
# 以下のキーは同じハッシュ値を持つように構成
# {"key1": 1, "key2": 2, ..., "key100000": 100000}
# → 挿入に O(n²) 時間がかかり、サーバーがハング
6.2 対策
# 対策1: ランダム化ハッシュ(Python 3.3+)
# PYTHONHASHSEED 環境変数でシードを制御
# デフォルトはプロセス起動時にランダムシード
import sys
print(sys.hash_info)
# sys.hash_info(width=64, modulus=2305843009213693951,
# inf=314159, nan=0, imag=1000003,
# algorithm='siphash24', ...)
# 対策2: SipHash(暗号学的PRF)
# Python 3.4+, Rust, Perl 5.18+ で標準採用
# 128ビットの秘密鍵を使用
# Hash DoS に対して安全、かつ十分高速
# 対策3: 赤黒木へのフォールバック(Java 8+)
# HashMap のバケット内要素が8を超えると
# リンクリスト → 赤黒木に変換
# 最悪ケースが O(n) → O(log n) に改善
# 対策4: リクエスト制限
# Webアプリケーションレベルでの対策
# - リクエストサイズの制限
# - JSONパラメータ数の上限設定
# - レート制限
# 対策5: カスタムハッシュ関数の使用
import hmac
import hashlib
def secure_hash(key: str, secret: bytes) -> int:
"""HMAC ベースのセキュアハッシュ"""
h = hmac.new(secret, key.encode(), hashlib.sha256)
return int.from_bytes(h.digest()[:8], 'big')7. ハッシュテーブルの応用データ構造
7.1 カウンティングブルームフィルター
class CountingBloomFilter:
"""削除をサポートするBloom Filter"""
def __init__(self, size, num_hashes):
self.size = size
self.num_hashes = num_hashes
self.counters = [0] * size # ビットではなくカウンター
def _hashes(self, item):
"""k個のハッシュ値を生成"""
import hashlib
indices = []
for i in range(self.num_hashes):
h = hashlib.sha256(f"{item}:{i}".encode()).hexdigest()
indices.append(int(h, 16) % self.size)
return indices
def add(self, item):
for idx in self._hashes(item):
self.counters[idx] += 1
def remove(self, item):
"""標準Bloom Filterでは不可能な削除操作"""
indices = self._hashes(item)
if all(self.counters[idx] > 0 for idx in indices):
for idx in indices:
self.counters[idx] -= 1
def contains(self, item):
return all(self.counters[idx] > 0 for idx in self._hashes(item))7.2 Cuckoo ハッシング
class CuckooHashTable:
"""Cuckoo ハッシング: 2つのハッシュ関数で最悪 O(1) 検索"""
MAX_KICKS = 500 # 最大追い出し回数
def __init__(self, capacity=16):
self.capacity = capacity
self.table1 = [None] * capacity
self.table2 = [None] * capacity
self.size = 0
def _hash1(self, key):
return hash(key) % self.capacity
def _hash2(self, key):
return hash(key * 2654435761) % self.capacity # 別のハッシュ
def get(self, key):
"""O(1) 最悪ケース検索"""
pos1 = self._hash1(key)
if self.table1[pos1] and self.table1[pos1][0] == key:
return self.table1[pos1][1]
pos2 = self._hash2(key)
if self.table2[pos2] and self.table2[pos2][0] == key:
return self.table2[pos2][1]
return None # 見つからない
def put(self, key, value):
"""挿入(Cuckoo 方式で追い出し)"""
# 既存キーの更新チェック
pos1 = self._hash1(key)
if self.table1[pos1] and self.table1[pos1][0] == key:
self.table1[pos1] = (key, value)
return
pos2 = self._hash2(key)
if self.table2[pos2] and self.table2[pos2][0] == key:
self.table2[pos2] = (key, value)
return
# 新規挿入
current = (key, value)
for _ in range(self.MAX_KICKS):
# table1 に挿入を試みる
pos = self._hash1(current[0])
if self.table1[pos] is None:
self.table1[pos] = current
self.size += 1
return
# 追い出し
current, self.table1[pos] = self.table1[pos], current
# table2 に挿入を試みる
pos = self._hash2(current[0])
if self.table2[pos] is None:
self.table2[pos] = current
self.size += 1
return
# 追い出し
current, self.table2[pos] = self.table2[pos], current
# 追い出しループ → リサイズが必要
self._resize()
self.put(current[0], current[1])
def _resize(self):
old_items = []
for entry in self.table1:
if entry:
old_items.append(entry)
for entry in self.table2:
if entry:
old_items.append(entry)
self.capacity *= 2
self.table1 = [None] * self.capacity
self.table2 = [None] * self.capacity
self.size = 0
for key, value in old_items:
self.put(key, value)
# Cuckoo ハッシングの利点:
# - 検索が最悪 O(1)(2箇所を見るだけ)
# - Cuckoo Filter(Bloom Filter の代替)の基盤
# - 挿入の期待計算量は O(1)(追い出し回数の期待値は定数)7.3 Swiss Table(Google の高性能ハッシュテーブル)
Swiss Table の仕組み:
Rust (hashbrown), C++ (Abseil), Go の map が採用
構造:| [KV|KV|..|KV] |
|---|
検索の流れ:
1. H1 = hash の下位ビット → グループのインデックス
2. H2 = hash の上位7ビット
3. SIMD命令で16個のcontrol byteを一括比較
4. マッチしたスロットのみキーを比較
利点:
- SIMD で 16 スロットを同時に比較
- キャッシュライン効率が高い
- 従来のオープンアドレス法より 2-3x 高速
- メモリオーバーヘッドは 1バイト/エントリ のみ
8. ハッシュテーブルのベンチマークと性能比較
8.1 Python での性能測定
import time
import random
import string
def benchmark_dict_operations(n):
"""dict の各操作のベンチマーク"""
# ランダムキー生成
keys = [''.join(random.choices(string.ascii_lowercase, k=10)) for _ in range(n)]
# 挿入
d = {}
start = time.perf_counter()
for k in keys:
d[k] = random.randint(0, 1000000)
insert_time = time.perf_counter() - start
# 検索(存在するキー)
start = time.perf_counter()
for k in keys:
_ = d[k]
search_hit_time = time.perf_counter() - start
# 検索(存在しないキー)
missing = [k + "x" for k in keys]
start = time.perf_counter()
for k in missing:
_ = d.get(k)
search_miss_time = time.perf_counter() - start
# 削除
start = time.perf_counter()
for k in keys:
del d[k]
delete_time = time.perf_counter() - start
print(f"n={n:>10,}")
print(f" 挿入: {insert_time:.4f}s ({insert_time/n*1e6:.2f}μs/op)")
print(f" 検索(hit): {search_hit_time:.4f}s ({search_hit_time/n*1e6:.2f}μs/op)")
print(f" 検索(miss): {search_miss_time:.4f}s ({search_miss_time/n*1e6:.2f}μs/op)")
print(f" 削除: {delete_time:.4f}s ({delete_time/n*1e6:.2f}μs/op)")
# 典型的な結果:
# n= 10,000
# 挿入: 0.0034s (0.34μs/op)
# 検索(hit): 0.0012s (0.12μs/op)
# 検索(miss): 0.0014s (0.14μs/op)
# 削除: 0.0010s (0.10μs/op)
# n= 1,000,000
# 挿入: 0.4521s (0.45μs/op)
# 検索(hit): 0.1523s (0.15μs/op)
# 検索(miss): 0.1842s (0.18μs/op)
# 削除: 0.1234s (0.12μs/op)8.2 データ構造間の性能比較
import time
from sortedcontainers import SortedDict
def compare_dict_vs_sorted(n):
"""dict vs SortedDict vs list の検索性能比較"""
import random
data = [(random.randint(0, n*10), random.randint(0, 1000)) for _ in range(n)]
search_keys = [random.randint(0, n*10) for _ in range(10000)]
# dict
d = dict(data)
start = time.perf_counter()
for k in search_keys:
_ = k in d
dict_time = time.perf_counter() - start
# SortedDict(平衡BST相当)
sd = SortedDict(data)
start = time.perf_counter()
for k in search_keys:
_ = k in sd
sorted_time = time.perf_counter() - start
# list(線形探索)
lst = data
start = time.perf_counter()
for k in search_keys:
_ = any(kk == k for kk, _ in lst)
list_time = time.perf_counter() - start
print(f"n={n:>10,} (10000回検索)")
print(f" dict: {dict_time:.4f}s")
print(f" SortedDict: {sorted_time:.4f}s")
print(f" list: {list_time:.4f}s")
print(f" dict/sorted: {sorted_time/dict_time:.1f}x遅い")
print(f" dict/list: {list_time/dict_time:.1f}x遅い")
# 典型的な結果:
# n= 100,000 (10000回検索)
# dict: 0.0009s
# SortedDict: 0.0213s
# list: 12.4523s
# dict/sorted: 23.7x遅い
# dict/list: 13836.x遅い9. 実務でよく使うパターン集
9.1 頻度カウントと統計
from collections import Counter, defaultdict
# パターン: ログ解析
def analyze_access_log(log_entries):
"""アクセスログの統計分析"""
# エンドポイント別アクセス数
endpoint_count = Counter(entry["path"] for entry in log_entries)
# ステータスコード別集計
status_count = Counter(entry["status"] for entry in log_entries)
# 時間帯別アクセス数
hourly_count = Counter(entry["timestamp"].hour for entry in log_entries)
# IPアドレス別アクセス数(上位10)
ip_count = Counter(entry["ip"] for entry in log_entries)
return {
"top_endpoints": endpoint_count.most_common(10),
"status_distribution": dict(status_count),
"peak_hour": hourly_count.most_common(1)[0],
"top_ips": ip_count.most_common(10),
"total_requests": len(log_entries),
"unique_ips": len(ip_count),
"error_rate": status_count.get(500, 0) / len(log_entries)
}
# パターン: 集合演算による権限管理
def check_permissions(user_roles: set, required_roles: set) -> bool:
"""ユーザーの権限チェック"""
return required_roles.issubset(user_roles)
def common_permissions(user_a_roles: set, user_b_roles: set) -> set:
"""2人のユーザーに共通する権限"""
return user_a_roles & user_b_roles
def exclusive_permissions(user_a_roles: set, user_b_roles: set) -> set:
"""ユーザーAだけが持つ権限"""
return user_a_roles - user_b_roles
# パターン: インデックス構築
def build_inverted_index(documents):
"""転置インデックスの構築(全文検索の基盤)"""
index = defaultdict(set)
for doc_id, text in enumerate(documents):
words = text.lower().split()
for word in words:
# 正規化(句読点除去等)
word = word.strip(".,!?;:")
index[word].add(doc_id)
return index
def search(index, query):
"""AND検索"""
words = query.lower().split()
if not words:
return set()
result = index.get(words[0], set())
for word in words[1:]:
result &= index.get(word, set())
return result
# 使用例
docs = [
"Python is a great programming language",
"Java is also a programming language",
"Python and Java are both popular",
"Rust is a systems programming language"
]
idx = build_inverted_index(docs)
print(search(idx, "programming language")) # {0, 1, 3}
print(search(idx, "Python")) # {0, 2}9.2 データのバリデーションと変換
# パターン: スキーマバリデーション
def validate_schema(data: dict, schema: dict) -> list:
"""簡易スキーマバリデーション"""
errors = []
for field, rules in schema.items():
# 必須チェック
if rules.get("required") and field not in data:
errors.append(f"Missing required field: {field}")
continue
if field not in data:
continue
value = data[field]
# 型チェック
expected_type = rules.get("type")
if expected_type and not isinstance(value, expected_type):
errors.append(f"Invalid type for {field}: expected {expected_type.__name__}")
# 範囲チェック
if "min" in rules and value < rules["min"]:
errors.append(f"{field} must be >= {rules['min']}")
if "max" in rules and value > rules["max"]:
errors.append(f"{field} must be <= {rules['max']}")
# 許可値チェック
if "choices" in rules and value not in rules["choices"]:
errors.append(f"{field} must be one of {rules['choices']}")
return errors
# 使用例
schema = {
"name": {"required": True, "type": str},
"age": {"required": True, "type": int, "min": 0, "max": 150},
"role": {"type": str, "choices": ["admin", "user", "guest"]},
}
errors = validate_schema(
{"name": "Alice", "age": 30, "role": "admin"},
schema
)
print(errors) # []
errors = validate_schema(
{"age": -5, "role": "superuser"},
schema
)
print(errors)
# ['Missing required field: name', 'age must be >= 0',
# 'role must be one of ["admin", "user", "guest"]']
# パターン: データのマッピングと変換
def transform_records(records, field_mapping):
"""フィールド名のマッピングと変換"""
transformed = []
for record in records:
new_record = {}
for old_key, (new_key, converter) in field_mapping.items():
if old_key in record:
new_record[new_key] = converter(record[old_key])
transformed.append(new_record)
return transformed
# 使用例: CSVデータの変換
mapping = {
"full_name": ("name", str.strip),
"birth_year": ("age", lambda y: 2026 - int(y)),
"salary_str": ("salary", lambda s: float(s.replace(",", ""))),
}
raw_data = [
{"full_name": " Alice ", "birth_year": "1990", "salary_str": "85,000.00"},
{"full_name": " Bob ", "birth_year": "1985", "salary_str": "92,500.50"},
]
print(transform_records(raw_data, mapping))
# [{'name': 'Alice', 'age': 36, 'salary': 85000.0},
# {'name': 'Bob', 'age': 41, 'salary': 92500.5}]9.3 設定管理と環境変数
import os
from typing import Any, Optional
class Config:
"""ハッシュテーブルベースの設定管理"""
def __init__(self, defaults: dict = None):
self._config = {}
self._defaults = defaults or {}
def load_from_env(self, prefix: str = "APP_"):
"""環境変数から設定を読み込み"""
for key, value in os.environ.items():
if key.startswith(prefix):
config_key = key[len(prefix):].lower()
self._config[config_key] = value
def load_from_dict(self, data: dict):
"""辞書から設定を読み込み(ネストをフラット化)"""
def flatten(d, parent_key=""):
for k, v in d.items():
new_key = f"{parent_key}.{k}" if parent_key else k
if isinstance(v, dict):
flatten(v, new_key)
else:
self._config[new_key] = v
flatten(data)
def get(self, key: str, default: Any = None) -> Any:
"""設定値を取得(優先順位: 設定 > デフォルト > 引数)"""
if key in self._config:
return self._config[key]
if key in self._defaults:
return self._defaults[key]
return default
def get_int(self, key: str, default: int = 0) -> int:
return int(self.get(key, default))
def get_bool(self, key: str, default: bool = False) -> bool:
val = self.get(key, default)
if isinstance(val, str):
return val.lower() in ("true", "1", "yes")
return bool(val)
# 使用例
config = Config(defaults={
"database.host": "localhost",
"database.port": 5432,
"debug": False,
})
config.load_from_dict({
"database": {"host": "db.example.com", "name": "myapp"},
"cache": {"ttl": 300},
})
print(config.get("database.host")) # "db.example.com"(オーバーライド)
print(config.get("database.port")) # 5432(デフォルト値)
print(config.get("cache.ttl")) # 30010. 実践演習
演習1: 基本操作(基礎)
ハッシュテーブルをゼロから実装せよ(チェイニング方式、リサイズ付き)。以下の要件を満たすこと:
put(key, value),get(key),remove(key),contains(key)の4操作- ロードファクター0.75でリサイズ
__len__,__iter__,__repr__の実装- 衝突数をカウントする機能
演習2: LRUキャッシュ(応用)
OrderedDict または dict + 双方向連結リストで LRUキャッシュを実装せよ。以下の要件を満たすこと:
get(key)とput(key, value)の両操作が O(1)- 容量超過時に最も古いエントリを自動削除
- TTL(有効期限)のサポート
- スレッドセーフ(threading.Lock使用)
演習3: 一貫性ハッシュ(発展)
分散システムで使われる一貫性ハッシュ(Consistent Hashing)を実装せよ。以下の要件を満たすこと:
- 仮想ノードによる負荷分散
- ノードの追加・削除
- レプリカ取得(N個の異なるノードを返す)
- 負荷分散の均一性をテストするベンチマーク
演習4: 転置インデックス(応用)
全文検索エンジンの基盤となる転置インデックスを実装せよ:
- 文書の追加と検索(AND検索、OR検索)
- TF-IDF スコアリング
- 前方一致検索のサポート
演習5: Cuckoo Filter(発展)
Bloom Filter の改良版である Cuckoo Filter を実装せよ:
- 挿入、検索、削除の全操作をサポート
- 偽陽性率のパラメータ調整
- Bloom Filter との性能比較ベンチマーク
FAQ
Q1: このトピックを学ぶ上で最も重要なポイントは何ですか?
実践的な経験を積むことが最も重要です。理論だけでなく、実際にコードを書いて動作を確認することで理解が深まります。
Q2: 初心者がよく陥る間違いは何ですか?
基礎を飛ばして応用に進むことです。このガイドで説明している基本概念をしっかり理解してから、次のステップに進むことをお勧めします。
Q3: 実務ではどのように活用されていますか?
このトピックの知識は、日常的な開発業務で頻繁に活用されます。特にコードレビューやアーキテクチャ設計の際に重要になります。
まとめ
| 概念 | ポイント |
|---|---|
| ハッシュ関数 | キー→整数。均一分布が理想。SipHash/MurmurHash3が実用的 |
| 衝突解決 | チェイニング or オープンアドレス。Robin Hood/Cuckoo も |
| 計算量 | 期待O(1)、最悪O(n)。償却分析でリサイズもO(1) |
| 実務 | dict/set/Map/Counter/defaultdict |
| 内部実装 | Python:コンパクト辞書、Java:赤黒木フォールバック、Rust:Swiss Table |
| セキュリティ | Hash DoS対策にSipHash。リクエスト制限も重要 |
| 分散 | 一貫性ハッシュで負荷分散。仮想ノードで均一化 |
| 順序 | Python 3.7+は挿入順保証。他は非保証 |
次に読むべきガイド
参考文献
- Cormen, T. H. "Introduction to Algorithms." Chapter 11: Hash Tables.
- Sedgewick, R. "Algorithms." Chapter 3.4: Hash Tables.
- Kleppmann, M. "Designing Data-Intensive Applications." Chapter 6: Partitioning.
- Karger, D. et al. "Consistent Hashing and Random Trees." STOC 1997.
- Aumasson, J-P., Bernstein, D. J. "SipHash: a fast short-input PRF." 2012.
- Abseil Team. "Swiss Tables Design Notes." Google, 2017.
- Pagh, R., Rodler, F. F. "Cuckoo Hashing." ESA 2001.
- Fan, B. et al. "Cuckoo Filter: Practically Better Than Bloom." CoNEXT 2014.