Skilore

データベース — sqlx / diesel / SeaORM

Rust でデータベースを扱うための主要 3 クレートを比較し、プロジェクトに最適な ORM / クエリビルダーを選択するための実践ガイド。

111 分で読めます55,253 文字

データベース — sqlx / diesel / SeaORM

Rust でデータベースを扱うための主要 3 クレートを比較し、プロジェクトに最適な ORM / クエリビルダーを選択するための実践ガイド。


この章で学ぶこと

  1. sqlx — コンパイル時 SQL 検証を持つ非同期クエリライブラリ
  2. diesel — 型安全な DSL ベースの同期 ORM
  3. SeaORM — ActiveRecord パターンの非同期 ORM

前提知識

このガイドを読む前に、以下の知識があると理解が深まります:

  • 基本的なプログラミングの知識
  • 関連する基礎概念の理解
  • Serde — JSON/TOML/YAML の内容を理解していること

1. Rust データベースクレートの全体像

1.1 レイヤー構成

Application Code
ORM / Query Layer
┌──────────┐ ┌──────────┐ ┌──────────┐
dieselSeaORMsqlx
(DSL型)(AR型)(Raw SQL)
└─────┬────┘ └─────┬────┘ └─────┬────┘
Connection Pool
┌──────────────────────────────────────┐
sqlx::Pool / deadpool / bb8
└──────────────────┬───────────────────┘
Driver
┌──────────┐ ┌──────────┐ ┌──────────┐
tokio-sqlx-libpq /
postgressqlitemysqlclient
└──────────┘ └──────────┘ └──────────┘

1.2 選択フロー

Rust でDBを使いたい
       │
       ├── 生の SQL を書きたい?
       │      │
       │      ├── Yes → sqlx
       │      │         (コンパイル時SQL検証が魅力)
       │      │
       │      └── No ──┐
       │               │
       ├── 型安全な DSL が欲しい?
       │      │
       │      ├── Yes → diesel
       │      │         (コンパイル時に全クエリを型検査)
       │      │
       │      └── No ──┐
       │               │
       └── ActiveRecord パターンが好み?
              │
              └── Yes → SeaORM
                        (Rails/Laravel 的な使い心地)

2. sqlx — コンパイル時 SQL 検証

2.1 セットアップ

# Cargo.toml
[dependencies]
sqlx = { version = "0.8", features = [
    "runtime-tokio",     # 非同期ランタイム
    "tls-rustls",        # TLS
    "postgres",          # PostgreSQL ドライバ
    "macros",            # query! マクロ
    "migrate",           # マイグレーション
    "chrono",            # 日時型サポート
    "uuid",              # UUID 型サポート
] }
tokio = { version = "1", features = ["full"] }

2.2 基本的な CRUD 操作

use sqlx::{PgPool, FromRow};
use uuid::Uuid;
use chrono::{DateTime, Utc};
 
#[derive(Debug, FromRow)]
struct User {
    id: Uuid,
    name: String,
    email: String,
    created_at: DateTime<Utc>,
}
 
// コネクションプール作成
async fn create_pool() -> Result<PgPool, sqlx::Error> {
    PgPool::builder()
        .max_connections(10)
        .connect("postgres://user:pass@localhost:5432/mydb")
        .await
}
 
// INSERT — query! マクロでコンパイル時に SQL を検証
async fn create_user(pool: &PgPool, name: &str, email: &str) -> Result<User, sqlx::Error> {
    sqlx::query_as!(
        User,
        r#"
        INSERT INTO users (id, name, email, created_at)
        VALUES ($1, $2, $3, NOW())
        RETURNING id, name, email, created_at
        "#,
        Uuid::new_v4(),
        name,
        email,
    )
    .fetch_one(pool)
    .await
}
 
// SELECT — 複数行取得
async fn list_users(pool: &PgPool, limit: i64) -> Result<Vec<User>, sqlx::Error> {
    sqlx::query_as!(
        User,
        "SELECT id, name, email, created_at FROM users ORDER BY created_at DESC LIMIT $1",
        limit,
    )
    .fetch_all(pool)
    .await
}
 
// UPDATE
async fn update_email(pool: &PgPool, user_id: Uuid, new_email: &str) -> Result<bool, sqlx::Error> {
    let result = sqlx::query!(
        "UPDATE users SET email = $1 WHERE id = $2",
        new_email,
        user_id,
    )
    .execute(pool)
    .await?;
 
    Ok(result.rows_affected() > 0)
}
 
// DELETE
async fn delete_user(pool: &PgPool, user_id: Uuid) -> Result<bool, sqlx::Error> {
    let result = sqlx::query!("DELETE FROM users WHERE id = $1", user_id)
        .execute(pool)
        .await?;
 
    Ok(result.rows_affected() > 0)
}

2.3 マイグレーション

# sqlx-cli のインストール
cargo install sqlx-cli --no-default-features --features postgres
 
# マイグレーション作成
sqlx migrate add create_users_table
 
# 生成されたファイルを編集
# migrations/20260101000000_create_users_table.sql
-- migrations/20260101000000_create_users_table.sql
CREATE TABLE users (
    id         UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name       VARCHAR(255) NOT NULL,
    email      VARCHAR(255) NOT NULL UNIQUE,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
 
CREATE INDEX idx_users_email ON users (email);
# マイグレーション実行
sqlx migrate run
 
# マイグレーション状態確認
sqlx migrate info

2.4 トランザクションの高度な使い方

sqlx では pool.begin() でトランザクションを開始し、commit() または rollback() で終了する。TransactionDrop 時に自動ロールバックするため、明示的な rollback() は必須ではない。

use sqlx::PgPool;
 
/// ネストされたトランザクション(SAVEPOINT を使用)
async fn nested_transaction_example(pool: &PgPool) -> Result<(), sqlx::Error> {
    let mut tx = pool.begin().await?;
 
    // 外側のトランザクションで操作
    sqlx::query!("INSERT INTO audit_log (action) VALUES ('start_batch')")
        .execute(&mut *tx)
        .await?;
 
    // SAVEPOINT を使ったネストされたトランザクション
    let mut savepoint = tx.begin().await?;  // SAVEPOINT が自動作成される
 
    let result = sqlx::query!(
        "UPDATE inventory SET quantity = quantity - 1 WHERE product_id = $1 AND quantity > 0",
        product_id
    )
    .execute(&mut *savepoint)
    .await?;
 
    if result.rows_affected() == 0 {
        // 在庫不足の場合、SAVEPOINT までロールバック
        // savepoint は Drop 時に自動ロールバック
        drop(savepoint);
    } else {
        savepoint.commit().await?;  // SAVEPOINT をリリース
    }
 
    // 外側のトランザクションはコミット
    sqlx::query!("INSERT INTO audit_log (action) VALUES ('end_batch')")
        .execute(&mut *tx)
        .await?;
 
    tx.commit().await?;
    Ok(())
}

2.5 ストリーミングクエリ

大量のデータを扱う場合、全行をメモリにロードするのではなく、ストリーミングで1行ずつ処理できる。

use sqlx::PgPool;
use futures::TryStreamExt;  // try_next() を使うために必要
 
/// 100万行のデータをストリーミングで処理
async fn process_large_dataset(pool: &PgPool) -> Result<(), sqlx::Error> {
    let mut stream = sqlx::query_as!(
        User,
        "SELECT id, name, email, created_at FROM users WHERE created_at > $1",
        cutoff_date,
    )
    .fetch(pool);  // fetch() はストリームを返す(fetch_all() ではない)
 
    let mut count = 0u64;
    while let Some(user) = stream.try_next().await? {
        // 1行ずつ処理 — メモリ使用量は一定
        process_user(&user).await?;
        count += 1;
 
        if count % 10_000 == 0 {
            tracing::info!("Processed {} users", count);
        }
    }
 
    tracing::info!("Total processed: {} users", count);
    Ok(())
}

2.6 動的クエリの構築

query! マクロは静的 SQL にしか対応しないが、QueryBuilder を使えば動的に SQL を組み立てられる。

use sqlx::{PgPool, QueryBuilder, Postgres};
 
/// 動的な検索条件でユーザーを検索
async fn search_users(
    pool: &PgPool,
    name_filter: Option<&str>,
    email_filter: Option<&str>,
    min_created_at: Option<DateTime<Utc>>,
    order_by: &str,
    limit: i64,
) -> Result<Vec<User>, sqlx::Error> {
    let mut builder: QueryBuilder<Postgres> = QueryBuilder::new(
        "SELECT id, name, email, created_at FROM users WHERE 1=1"
    );
 
    if let Some(name) = name_filter {
        builder.push(" AND name ILIKE ");
        builder.push_bind(format!("%{}%", name));
    }
 
    if let Some(email) = email_filter {
        builder.push(" AND email ILIKE ");
        builder.push_bind(format!("%{}%", email));
    }
 
    if let Some(min_date) = min_created_at {
        builder.push(" AND created_at >= ");
        builder.push_bind(min_date);
    }
 
    // ORDER BY はバインド変数にできないため、ホワイトリストで検証
    let safe_order = match order_by {
        "name" => "name",
        "email" => "email",
        "created_at" => "created_at",
        _ => "created_at",  // デフォルト
    };
    builder.push(format!(" ORDER BY {} DESC", safe_order));
 
    builder.push(" LIMIT ");
    builder.push_bind(limit);
 
    builder
        .build_query_as::<User>()
        .fetch_all(pool)
        .await
}

2.7 バルクインサート

大量のレコードを効率的に挿入する方法。

use sqlx::{PgPool, QueryBuilder, Postgres};
 
/// バルクインサート — 1回のクエリで複数行を挿入
async fn bulk_insert_users(
    pool: &PgPool,
    users: &[(String, String)],  // (name, email)
) -> Result<u64, sqlx::Error> {
    // PostgreSQL の場合、VALUES 句に複数行を指定
    let mut builder: QueryBuilder<Postgres> = QueryBuilder::new(
        "INSERT INTO users (id, name, email, created_at) "
    );
 
    builder.push_values(users.iter(), |mut b, (name, email)| {
        b.push_bind(Uuid::new_v4())
         .push_bind(name)
         .push_bind(email)
         .push("NOW()");
    });
 
    let result = builder.build().execute(pool).await?;
    Ok(result.rows_affected())
}
 
/// UNNEST を使ったさらに高効率なバルクインサート
async fn bulk_insert_with_unnest(
    pool: &PgPool,
    names: &[String],
    emails: &[String],
) -> Result<u64, sqlx::Error> {
    let ids: Vec<Uuid> = (0..names.len()).map(|_| Uuid::new_v4()).collect();
 
    let result = sqlx::query!(
        r#"
        INSERT INTO users (id, name, email, created_at)
        SELECT * FROM UNNEST($1::uuid[], $2::text[], $3::text[])
        AS t(id, name, email),
        LATERAL (SELECT NOW() AS created_at) ts
        "#,
        &ids,
        names,
        emails,
    )
    .execute(pool)
    .await?;
 
    Ok(result.rows_affected())
}

2.8 カスタム型のマッピング

sqlx で PostgreSQL のカスタム型(ENUM、複合型)をマッピングする方法。

use sqlx::Type;
 
// PostgreSQL ENUM 型のマッピング
// CREATE TYPE user_role AS ENUM ('admin', 'moderator', 'user');
#[derive(Debug, Clone, PartialEq, Type)]
#[sqlx(type_name = "user_role", rename_all = "lowercase")]
pub enum UserRole {
    Admin,
    Moderator,
    User,
}
 
#[derive(Debug, FromRow)]
struct UserWithRole {
    id: Uuid,
    name: String,
    email: String,
    role: UserRole,
    created_at: DateTime<Utc>,
}
 
async fn find_admins(pool: &PgPool) -> Result<Vec<UserWithRole>, sqlx::Error> {
    sqlx::query_as!(
        UserWithRole,
        r#"
        SELECT id, name, email, role as "role: UserRole", created_at
        FROM users
        WHERE role = $1
        "#,
        UserRole::Admin as UserRole,
    )
    .fetch_all(pool)
    .await
}
 
// JSON 型のマッピング
use sqlx::types::Json;
use serde::{Serialize, Deserialize};
 
#[derive(Debug, Serialize, Deserialize)]
struct UserPreferences {
    theme: String,
    language: String,
    notifications_enabled: bool,
}
 
#[derive(Debug, FromRow)]
struct UserWithPrefs {
    id: Uuid,
    name: String,
    preferences: Json<UserPreferences>,
}
 
async fn update_preferences(
    pool: &PgPool,
    user_id: Uuid,
    prefs: &UserPreferences,
) -> Result<(), sqlx::Error> {
    sqlx::query!(
        "UPDATE users SET preferences = $1 WHERE id = $2",
        Json(prefs) as _,
        user_id,
    )
    .execute(pool)
    .await?;
    Ok(())
}

3. diesel — 型安全な DSL ベース ORM

3.1 セットアップ

# Cargo.toml
[dependencies]
diesel = { version = "2.2", features = ["postgres", "chrono", "uuid"] }
diesel_migrations = "2.2"
dotenvy = "0.15"

3.2 スキーマとモデル定義

// src/schema.rs (diesel CLI が自動生成)
diesel::table! {
    users (id) {
        id -> Uuid,
        name -> Varchar,
        email -> Varchar,
        created_at -> Timestamptz,
        updated_at -> Timestamptz,
    }
}
 
diesel::table! {
    posts (id) {
        id -> Uuid,
        user_id -> Uuid,
        title -> Varchar,
        body -> Text,
        published -> Bool,
        created_at -> Timestamptz,
    }
}
 
diesel::joinable!(posts -> users (user_id));
diesel::allow_tables_to_appear_in_same_query!(users, posts);
// src/models.rs
use diesel::prelude::*;
use uuid::Uuid;
use chrono::{DateTime, Utc};
 
#[derive(Queryable, Selectable, Debug)]
#[diesel(table_name = crate::schema::users)]
pub struct User {
    pub id: Uuid,
    pub name: String,
    pub email: String,
    pub created_at: DateTime<Utc>,
    pub updated_at: DateTime<Utc>,
}
 
#[derive(Insertable)]
#[diesel(table_name = crate::schema::users)]
pub struct NewUser<'a> {
    pub name: &'a str,
    pub email: &'a str,
}

3.3 CRUD 操作

use diesel::prelude::*;
use crate::schema::users;
use crate::models::{User, NewUser};
 
// INSERT
fn create_user(conn: &mut PgConnection, name: &str, email: &str) -> QueryResult<User> {
    let new_user = NewUser { name, email };
 
    diesel::insert_into(users::table)
        .values(&new_user)
        .returning(User::as_returning())
        .get_result(conn)
}
 
// SELECT with filter
fn find_user_by_email(conn: &mut PgConnection, email_addr: &str) -> QueryResult<User> {
    users::table
        .filter(users::email.eq(email_addr))
        .select(User::as_select())
        .first(conn)
}
 
// SELECT with JOIN
fn get_user_with_posts(conn: &mut PgConnection, user_id: Uuid) -> QueryResult<Vec<(User, Post)>> {
    users::table
        .inner_join(posts::table)
        .filter(users::id.eq(user_id))
        .filter(posts::published.eq(true))
        .select((User::as_select(), Post::as_select()))
        .load(conn)
}
 
// UPDATE
fn update_user_email(conn: &mut PgConnection, user_id: Uuid, new_email: &str) -> QueryResult<User> {
    diesel::update(users::table.filter(users::id.eq(user_id)))
        .set(users::email.eq(new_email))
        .returning(User::as_returning())
        .get_result(conn)
}
 
// DELETE
fn delete_user(conn: &mut PgConnection, user_id: Uuid) -> QueryResult<usize> {
    diesel::delete(users::table.filter(users::id.eq(user_id)))
        .execute(conn)
}

3.4 diesel-async による非同期対応

diesel は本来同期的だが、diesel-async クレートを使うと非同期で操作できる。

# Cargo.toml
[dependencies]
diesel = { version = "2.2", features = ["postgres", "chrono", "uuid"] }
diesel-async = { version = "0.5", features = ["postgres", "deadpool"] }
deadpool = "0.12"
use diesel_async::{AsyncPgConnection, RunQueryDsl, AsyncConnection};
use diesel_async::pooled_connection::deadpool::Pool;
use diesel_async::pooled_connection::AsyncDieselConnectionManager;
 
type DbPool = Pool<AsyncPgConnection>;
 
/// 非同期コネクションプールの作成
fn create_pool(database_url: &str) -> DbPool {
    let config = AsyncDieselConnectionManager::<AsyncPgConnection>::new(database_url);
    Pool::builder(config)
        .max_size(16)
        .build()
        .expect("Failed to create pool")
}
 
/// 非同期での CRUD 操作
async fn create_user_async(
    pool: &DbPool,
    name: &str,
    email: &str,
) -> QueryResult<User> {
    let mut conn = pool.get().await.expect("Failed to get connection");
 
    let new_user = NewUser { name, email };
    diesel::insert_into(users::table)
        .values(&new_user)
        .returning(User::as_returning())
        .get_result(&mut conn)
        .await
}
 
/// 非同期でのトランザクション
async fn transfer_with_diesel_async(
    pool: &DbPool,
    from_id: Uuid,
    to_id: Uuid,
    amount: i64,
) -> QueryResult<()> {
    let mut conn = pool.get().await.expect("Failed to get connection");
 
    conn.transaction::<_, diesel::result::Error, _>(|conn| {
        Box::pin(async move {
            diesel::update(accounts::table.filter(accounts::id.eq(from_id)))
                .set(accounts::balance.eq(accounts::balance - amount))
                .execute(conn)
                .await?;
 
            diesel::update(accounts::table.filter(accounts::id.eq(to_id)))
                .set(accounts::balance.eq(accounts::balance + amount))
                .execute(conn)
                .await?;
 
            Ok(())
        })
    })
    .await
}

3.5 カスタムクエリとページネーション

use diesel::prelude::*;
use diesel::dsl::count_star;
 
/// ページネーション付きの検索
fn paginated_users(
    conn: &mut PgConnection,
    page: i64,
    per_page: i64,
    name_filter: Option<&str>,
) -> QueryResult<(Vec<User>, i64)> {
    let mut query = users::table.into_boxed();  // boxed() で動的クエリを有効化
 
    if let Some(name) = name_filter {
        query = query.filter(users::name.ilike(format!("%{}%", name)));
    }
 
    // 総件数の取得
    let total = users::table
        .select(count_star())
        .first::<i64>(conn)?;
 
    // ページネーション
    let results = query
        .order(users::created_at.desc())
        .limit(per_page)
        .offset((page - 1) * per_page)
        .select(User::as_select())
        .load(conn)?;
 
    Ok((results, total))
}
 
/// GROUP BY と集計関数
fn user_post_counts(conn: &mut PgConnection) -> QueryResult<Vec<(Uuid, String, i64)>> {
    users::table
        .left_join(posts::table)
        .group_by((users::id, users::name))
        .select((users::id, users::name, diesel::dsl::count(posts::id.nullable())))
        .order(diesel::dsl::count(posts::id.nullable()).desc())
        .load::<(Uuid, String, i64)>(conn)
}
 
/// サブクエリの使用
fn users_with_recent_posts(conn: &mut PgConnection) -> QueryResult<Vec<User>> {
    let recent_posters = posts::table
        .filter(posts::created_at.gt(chrono::Utc::now() - chrono::Duration::days(7)))
        .select(posts::user_id);
 
    users::table
        .filter(users::id.eq_any(recent_posters))
        .select(User::as_select())
        .load(conn)
}

3.6 diesel のマイグレーション管理

# diesel CLI のインストール
cargo install diesel_cli --no-default-features --features postgres
 
# プロジェクト初期化(diesel.toml と migrations/ ディレクトリ作成)
diesel setup
 
# マイグレーション作成
diesel migration generate create_users
 
# マイグレーション実行
diesel migration run
 
# マイグレーション巻き戻し
diesel migration revert
 
# マイグレーション状態確認
diesel migration list
 
# スキーマの再生成
diesel print-schema > src/schema.rs
-- migrations/2026-01-01-000000_create_users/up.sql
CREATE TABLE users (
    id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
    name VARCHAR(255) NOT NULL,
    email VARCHAR(255) NOT NULL UNIQUE,
    created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
    updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
 
-- migrations/2026-01-01-000000_create_users/down.sql
DROP TABLE users;

diesel のマイグレーションは up.sqldown.sql のペアで管理される。ロールバック可能なマイグレーションを書くことで、開発時のスキーマ変更が容易になる。


4. SeaORM — ActiveRecord パターンの非同期 ORM

4.1 セットアップ

# Cargo.toml
[dependencies]
sea-orm = { version = "1.0", features = [
    "sqlx-postgres",
    "runtime-tokio-rustls",
    "macros",
] }

4.2 エンティティ定義

// src/entities/user.rs
use sea_orm::entity::prelude::*;
 
#[derive(Clone, Debug, PartialEq, DeriveEntityModel)]
#[sea_orm(table_name = "users")]
pub struct Model {
    #[sea_orm(primary_key)]
    pub id: Uuid,
    pub name: String,
    pub email: String,
    pub created_at: DateTimeUtc,
}
 
#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
pub enum Relation {
    #[sea_orm(has_many = "super::post::Entity")]
    Posts,
}
 
impl Related<super::post::Entity> for Entity {
    fn to() -> RelationDef {
        Relation::Posts.def()
    }
}
 
impl ActiveModelBehavior for ActiveModel {}

4.3 CRUD 操作

use sea_orm::*;
use crate::entities::{user, post};
 
// INSERT
async fn create_user(db: &DatabaseConnection, name: &str, email: &str) -> Result<user::Model, DbErr> {
    let new_user = user::ActiveModel {
        id: Set(Uuid::new_v4()),
        name: Set(name.to_string()),
        email: Set(email.to_string()),
        created_at: Set(chrono::Utc::now()),
        ..Default::default()
    };
 
    new_user.insert(db).await
}
 
// SELECT with filter and pagination
async fn list_users(
    db: &DatabaseConnection,
    page: u64,
    per_page: u64,
) -> Result<(Vec<user::Model>, u64), DbErr> {
    let paginator = user::Entity::find()
        .order_by_desc(user::Column::CreatedAt)
        .paginate(db, per_page);
 
    let total = paginator.num_pages().await?;
    let users = paginator.fetch_page(page).await?;
 
    Ok((users, total))
}
 
// SELECT with JOIN (Eager Loading)
async fn get_user_with_posts(
    db: &DatabaseConnection,
    user_id: Uuid,
) -> Result<Option<(user::Model, Vec<post::Model>)>, DbErr> {
    let result = user::Entity::find_by_id(user_id)
        .find_with_related(post::Entity)
        .all(db)
        .await?;
 
    Ok(result.into_iter().next())
}
 
// UPDATE
async fn update_email(
    db: &DatabaseConnection,
    user_id: Uuid,
    new_email: &str,
) -> Result<user::Model, DbErr> {
    let mut user: user::ActiveModel = user::Entity::find_by_id(user_id)
        .one(db)
        .await?
        .ok_or(DbErr::RecordNotFound("User not found".into()))?
        .into();
 
    user.email = Set(new_email.to_string());
    user.update(db).await
}
 
// DELETE
async fn delete_user(db: &DatabaseConnection, user_id: Uuid) -> Result<DeleteResult, DbErr> {
    user::Entity::delete_by_id(user_id).exec(db).await
}

4.4 SeaORM の高度なクエリ

use sea_orm::*;
use sea_orm::sea_query::{Expr, Func};
 
/// 条件付き動的クエリ
async fn search_users_sea(
    db: &DatabaseConnection,
    name_filter: Option<String>,
    email_filter: Option<String>,
    page: u64,
    per_page: u64,
) -> Result<(Vec<user::Model>, u64), DbErr> {
    let mut query = user::Entity::find();
 
    if let Some(name) = name_filter {
        query = query.filter(user::Column::Name.contains(&name));
    }
 
    if let Some(email) = email_filter {
        query = query.filter(user::Column::Email.contains(&email));
    }
 
    let paginator = query
        .order_by_desc(user::Column::CreatedAt)
        .paginate(db, per_page);
 
    let total = paginator.num_pages().await?;
    let users = paginator.fetch_page(page).await?;
 
    Ok((users, total))
}
 
/// カスタム SELECT とグループ化
async fn user_post_stats(
    db: &DatabaseConnection,
) -> Result<Vec<(Uuid, String, i64)>, DbErr> {
    #[derive(Debug, FromQueryResult)]
    struct UserPostCount {
        user_id: Uuid,
        user_name: String,
        post_count: i64,
    }
 
    let results = user::Entity::find()
        .select_only()
        .column(user::Column::Id)
        .column(user::Column::Name)
        .column_as(post::Column::Id.count(), "post_count")
        .join(JoinType::LeftJoin, user::Relation::Posts.def())
        .group_by(user::Column::Id)
        .group_by(user::Column::Name)
        .order_by_desc(Expr::col(post::Column::Id).count())
        .into_model::<UserPostCount>()
        .all(db)
        .await?;
 
    Ok(results.into_iter().map(|r| (r.user_id, r.user_name, r.post_count)).collect())
}
 
/// トランザクション
async fn create_user_with_post(
    db: &DatabaseConnection,
    name: &str,
    email: &str,
    post_title: &str,
    post_body: &str,
) -> Result<(user::Model, post::Model), DbErr> {
    let txn = db.begin().await?;
 
    let new_user = user::ActiveModel {
        id: Set(Uuid::new_v4()),
        name: Set(name.to_string()),
        email: Set(email.to_string()),
        created_at: Set(chrono::Utc::now()),
        ..Default::default()
    };
    let user = new_user.insert(&txn).await?;
 
    let new_post = post::ActiveModel {
        id: Set(Uuid::new_v4()),
        user_id: Set(user.id),
        title: Set(post_title.to_string()),
        body: Set(post_body.to_string()),
        published: Set(false),
        created_at: Set(chrono::Utc::now()),
        ..Default::default()
    };
    let post = new_post.insert(&txn).await?;
 
    txn.commit().await?;
    Ok((user, post))
}

4.5 SeaORM CLI とコード生成

# sea-orm-cli のインストール
cargo install sea-orm-cli
 
# データベースからエンティティを自動生成
sea-orm-cli generate entity \
    --database-url "postgres://user:pass@localhost:5432/mydb" \
    --output-dir src/entities \
    --with-serde both \
    --date-time-crate chrono
 
# マイグレーション作成
sea-orm-cli migrate generate create_users_table
 
# マイグレーション実行
sea-orm-cli migrate up
 
# マイグレーション巻き戻し
sea-orm-cli migrate down

エンティティの自動生成は既存のデータベーススキーマから Rust コードを生成するため、特にレガシーデータベースとの統合時に非常に便利である。


5. 比較表

5.1 機能比較

機能 sqlx diesel SeaORM
パラダイム 生 SQL + マクロ DSL ベース ORM ActiveRecord ORM
非同期対応 ネイティブ diesel-async で対応 ネイティブ
コンパイル時検証 SQL 構文 + 型 DSL レベルの型安全 なし(ランタイム検証)
マイグレーション SQL ファイル DSL or SQL SeaORM CLI
PostgreSQL 対応 対応 対応
MySQL 対応 対応 対応
SQLite 対応 対応 対応
トランザクション 対応 対応 対応
接続プール 組み込み 外部 (r2d2/deadpool) 組み込み (sqlx ベース)
学習コスト 低(SQL が書ければ OK) 中(DSL の習得が必要) 中(エンティティ定義の理解)
コミュニティ

5.2 パフォーマンス特性比較

観点 sqlx diesel SeaORM
クエリ生成オーバーヘッド なし(生SQL) 小(DSL→SQL変換) 中(ORM抽象化)
コンパイル時間 中(マクロ展開) 長(型推論が重い)
バイナリサイズ
N+1 問題対策 手動(SQL制御) 手動(JOIN記述) find_with_related
バルクインサート query + unnest insert_into().values(&vec) insert_many()
生SQLフォールバック デフォルト sql_query() FromQueryResult

6. 接続プールの設計と最適化

6.1 接続プールの基本原則

データベース接続はリソースコストが高いため、接続プールを使って再利用する。プールサイズの設定はアプリケーションのパフォーマンスに直接影響する。

use sqlx::postgres::PgPoolOptions;
 
/// プロダクション向けの接続プール設定
async fn create_production_pool(database_url: &str) -> Result<PgPool, sqlx::Error> {
    PgPoolOptions::new()
        // 最大接続数: CPU コア数 * 2 + ディスクスピンドル数が目安
        // 一般的には 10-20 が適切
        .max_connections(20)
        // 最小接続数: アイドル時にも維持する接続数
        .min_connections(5)
        // 接続タイムアウト: プールから接続を取得するまでの最大待ち時間
        .acquire_timeout(std::time::Duration::from_secs(5))
        // アイドルタイムアウト: 使われていない接続を閉じるまでの時間
        .idle_timeout(std::time::Duration::from_secs(600))
        // 接続の最大生存時間: 古い接続を定期的にリフレッシュ
        .max_lifetime(std::time::Duration::from_secs(1800))
        // 接続時に実行される SQL(接続の検証やセッション設定)
        .after_connect(|conn, _meta| {
            Box::pin(async move {
                sqlx::query("SET timezone = 'UTC'")
                    .execute(conn)
                    .await?;
                sqlx::query("SET statement_timeout = '30s'")
                    .execute(conn)
                    .await?;
                Ok(())
            })
        })
        .connect(database_url)
        .await
}

6.2 接続プールサイズの決定

接続プールサイズは以下の要因を考慮して決定する。

推奨プールサイズの計算式:

  pool_size = (CPU コア数 * 2) + 有効ディスクスピンドル数

例:
  4コア CPU + SSD (1 スピンドル相当)
  → pool_size = (4 * 2) + 1 = 9 ≈ 10

注意事項:
  - PostgreSQL のデフォルト最大接続数は 100
  - 複数のアプリケーションインスタンスがある場合は合計を考慮
  - pool_size = max_connections(DB) / app_instances - margin

6.3 接続プールの監視

use sqlx::PgPool;
use tracing::info;
 
/// 接続プールの状態をログに出力
async fn log_pool_status(pool: &PgPool) {
    let size = pool.size();
    let idle = pool.num_idle();
    let acquired = size - idle as u32;
 
    info!(
        pool.size = size,
        pool.idle = idle,
        pool.acquired = acquired,
        "Connection pool status"
    );
 
    // プール使用率が 80% を超えたら警告
    if (acquired as f64 / size as f64) > 0.8 {
        tracing::warn!(
            "Connection pool utilization is high: {}/{}",
            acquired,
            size
        );
    }
}
 
/// ヘルスチェック用のエンドポイント
async fn health_check(pool: &PgPool) -> Result<(), sqlx::Error> {
    sqlx::query("SELECT 1")
        .execute(pool)
        .await?;
    Ok(())
}

7. テスト戦略

7.1 テスト用データベースの管理

データベースのテストでは、各テストが独立して実行できるようにすることが重要である。

/// テスト用のデータベースを作成するヘルパー
async fn create_test_database() -> PgPool {
    let db_name = format!("test_{}", Uuid::new_v4().to_string().replace('-', ""));
    let admin_url = "postgres://user:pass@localhost:5432/postgres";
 
    // テスト用データベースを作成
    let admin_pool = PgPool::connect(admin_url).await.unwrap();
    sqlx::query(&format!("CREATE DATABASE {}", db_name))
        .execute(&admin_pool)
        .await
        .unwrap();
 
    // テスト用データベースに接続してマイグレーション実行
    let test_url = format!("postgres://user:pass@localhost:5432/{}", db_name);
    let pool = PgPool::connect(&test_url).await.unwrap();
    sqlx::migrate!("./migrations")
        .run(&pool)
        .await
        .unwrap();
 
    pool
}
 
/// テスト終了後にデータベースを削除
async fn cleanup_test_database(pool: PgPool, db_name: &str) {
    pool.close().await;
 
    let admin_url = "postgres://user:pass@localhost:5432/postgres";
    let admin_pool = PgPool::connect(admin_url).await.unwrap();
    sqlx::query(&format!("DROP DATABASE IF EXISTS {}", db_name))
        .execute(&admin_pool)
        .await
        .unwrap();
}

7.2 トランザクションベースのテスト

各テストをトランザクション内で実行し、終了後にロールバックすることで高速かつクリーンなテストを実現する。

#[cfg(test)]
mod tests {
    use super::*;
    use sqlx::PgPool;
 
    /// トランザクション内でテストを実行するヘルパー
    /// テスト終了時に自動ロールバックされる
    async fn with_test_tx<F, Fut>(pool: &PgPool, f: F)
    where
        F: FnOnce(sqlx::Transaction<'_, sqlx::Postgres>) -> Fut,
        Fut: std::future::Future<Output = ()>,
    {
        let tx = pool.begin().await.unwrap();
        f(tx).await;
        // tx は Drop 時に自動ロールバック
    }
 
    #[sqlx::test]
    async fn test_create_user(pool: PgPool) {
        // sqlx::test マクロはテスト用DBを自動管理
        let user = create_user(&pool, "Test User", "test@example.com")
            .await
            .unwrap();
 
        assert_eq!(user.name, "Test User");
        assert_eq!(user.email, "test@example.com");
    }
 
    #[sqlx::test(fixtures("users"))]
    async fn test_list_users(pool: PgPool) {
        // fixtures ディレクトリから初期データを投入
        let users = list_users(&pool, 10).await.unwrap();
        assert!(!users.is_empty());
    }
 
    #[sqlx::test]
    async fn test_update_email(pool: PgPool) {
        let user = create_user(&pool, "Test", "old@example.com")
            .await
            .unwrap();
 
        let updated = update_email(&pool, user.id, "new@example.com")
            .await
            .unwrap();
 
        assert!(updated);
 
        let found = sqlx::query_as!(
            User,
            "SELECT id, name, email, created_at FROM users WHERE id = $1",
            user.id
        )
        .fetch_one(&pool)
        .await
        .unwrap();
 
        assert_eq!(found.email, "new@example.com");
    }
}

7.3 diesel のテスト

#[cfg(test)]
mod tests {
    use diesel::prelude::*;
    use diesel::Connection;
 
    /// diesel でのトランザクションベーステスト
    #[test]
    fn test_create_user_diesel() {
        let mut conn = PgConnection::establish("postgres://...")
            .expect("Failed to connect");
 
        // test_transaction はトランザクション内でテストを実行し、
        // 終了後に自動ロールバックする
        conn.test_transaction::<_, diesel::result::Error, _>(|conn| {
            let user = create_user(conn, "Test", "test@example.com")?;
            assert_eq!(user.name, "Test");
            assert_eq!(user.email, "test@example.com");
 
            let found = find_user_by_email(conn, "test@example.com")?;
            assert_eq!(found.id, user.id);
 
            Ok(())
        });
    }
}

7.4 テストフィクスチャの管理

-- fixtures/users.sql(sqlx::test の fixtures で使用)
INSERT INTO users (id, name, email, created_at) VALUES
    ('550e8400-e29b-41d4-a716-446655440001', 'Alice', 'alice@example.com', '2026-01-01 00:00:00+00'),
    ('550e8400-e29b-41d4-a716-446655440002', 'Bob', 'bob@example.com', '2026-01-02 00:00:00+00'),
    ('550e8400-e29b-41d4-a716-446655440003', 'Charlie', 'charlie@example.com', '2026-01-03 00:00:00+00');
/// テストデータビルダーパターン
struct UserBuilder {
    name: String,
    email: String,
}
 
impl UserBuilder {
    fn new() -> Self {
        Self {
            name: "Default User".to_string(),
            email: format!("user-{}@example.com", Uuid::new_v4()),
        }
    }
 
    fn name(mut self, name: &str) -> Self {
        self.name = name.to_string();
        self
    }
 
    fn email(mut self, email: &str) -> Self {
        self.email = email.to_string();
        self
    }
 
    async fn build(self, pool: &PgPool) -> User {
        create_user(pool, &self.name, &self.email)
            .await
            .expect("Failed to create test user")
    }
}
 
// 使用例
#[sqlx::test]
async fn test_with_builder(pool: PgPool) {
    let alice = UserBuilder::new()
        .name("Alice")
        .email("alice@test.com")
        .build(&pool)
        .await;
 
    let bob = UserBuilder::new()
        .name("Bob")
        .build(&pool)  // email は自動生成
        .await;
 
    assert_ne!(alice.id, bob.id);
}

8. アンチパターン

8.1 N+1 クエリ問題

// NG: ユーザーごとに投稿を個別取得(N+1)
async fn bad_get_users_with_posts(pool: &PgPool) -> Result<Vec<(User, Vec<Post>)>> {
    let users = sqlx::query_as!(User, "SELECT * FROM users")
        .fetch_all(pool).await?;
 
    let mut result = Vec::new();
    for user in users {
        // ユーザー数だけクエリが発行される!
        let posts = sqlx::query_as!(Post,
            "SELECT * FROM posts WHERE user_id = $1", user.id
        ).fetch_all(pool).await?;
        result.push((user, posts));
    }
    Ok(result)
}
 
// OK: JOIN で1回のクエリで取得
async fn good_get_users_with_posts(pool: &PgPool) -> Result<Vec<UserWithPosts>> {
    sqlx::query_as!(UserWithPosts,
        r#"
        SELECT u.id, u.name, u.email,
               COALESCE(json_agg(p.*) FILTER (WHERE p.id IS NOT NULL), '[]') as "posts!: Json<Vec<Post>>"
        FROM users u
        LEFT JOIN posts p ON p.user_id = u.id
        GROUP BY u.id
        "#
    ).fetch_all(pool).await
}

8.2 トランザクション未使用での複数操作

// NG: トランザクションなしで関連データを操作
async fn bad_transfer(pool: &PgPool, from: Uuid, to: Uuid, amount: i64) -> Result<()> {
    sqlx::query!("UPDATE accounts SET balance = balance - $1 WHERE id = $2", amount, from)
        .execute(pool).await?;
    // ← ここで障害が発生すると残高が消失!
    sqlx::query!("UPDATE accounts SET balance = balance + $1 WHERE id = $2", amount, to)
        .execute(pool).await?;
    Ok(())
}
 
// OK: トランザクションで原子性を保証
async fn good_transfer(pool: &PgPool, from: Uuid, to: Uuid, amount: i64) -> Result<()> {
    let mut tx = pool.begin().await?;
 
    sqlx::query!("UPDATE accounts SET balance = balance - $1 WHERE id = $2", amount, from)
        .execute(&mut *tx).await?;
    sqlx::query!("UPDATE accounts SET balance = balance + $1 WHERE id = $2", amount, to)
        .execute(&mut *tx).await?;
 
    tx.commit().await?;  // 両方成功した場合のみコミット
    Ok(())
}

8.3 接続プールの枯渇

// NG: 接続を長時間保持する
async fn bad_long_running(pool: &PgPool) -> Result<()> {
    let mut tx = pool.begin().await?;  // 接続を1つ確保
 
    // 外部 API の呼び出しなど、DB と無関係な処理
    let response = reqwest::get("https://api.example.com/slow-endpoint")
        .await?;  // ← この間ずっと DB 接続を保持!
 
    sqlx::query!("INSERT INTO results (data) VALUES ($1)", response.text().await?)
        .execute(&mut *tx).await?;
 
    tx.commit().await?;
    Ok(())
}
 
// OK: DB 操作と非 DB 操作を分離
async fn good_minimal_connection(pool: &PgPool) -> Result<()> {
    // 1. まず外部 API を呼び出す(DB 接続不要)
    let response = reqwest::get("https://api.example.com/slow-endpoint")
        .await?;
    let data = response.text().await?;
 
    // 2. DB 操作は最小限の時間で完了
    sqlx::query!("INSERT INTO results (data) VALUES ($1)", data)
        .execute(pool).await?;
 
    Ok(())
}

8.4 不適切なインデックス設計

// NG: インデックスなしのカラムで頻繁に検索
async fn bad_search(pool: &PgPool, status: &str) -> Result<Vec<Order>> {
    // status カラムにインデックスがない場合、全テーブルスキャンになる
    sqlx::query_as!(Order, "SELECT * FROM orders WHERE status = $1", status)
        .fetch_all(pool).await
}
 
// OK: 適切なインデックスをマイグレーションで追加
// migrations/xxx_add_orders_status_index.sql:
// CREATE INDEX CONCURRENTLY idx_orders_status ON orders (status);
//
// 複合インデックスの場合はカラム順序が重要:
// CREATE INDEX idx_orders_status_date ON orders (status, created_at DESC);
// ↑ WHERE status = ? ORDER BY created_at DESC のクエリに最適

8.5 SELECT * の乱用

// NG: 不要なカラムも含めて全カラム取得
async fn bad_get_names(pool: &PgPool) -> Result<Vec<String>> {
    let users = sqlx::query_as!(User, "SELECT * FROM users")  // 全カラム取得
        .fetch_all(pool).await?;
    Ok(users.into_iter().map(|u| u.name).collect())  // name しか使わない
}
 
// OK: 必要なカラムだけ取得
async fn good_get_names(pool: &PgPool) -> Result<Vec<String>> {
    let rows = sqlx::query_scalar!("SELECT name FROM users")
        .fetch_all(pool).await?;
    Ok(rows)
}

9. パフォーマンスチューニング

9.1 EXPLAIN ANALYZE による実行計画の分析

/// クエリの実行計画を取得するユーティリティ
async fn explain_query(pool: &PgPool, query: &str) -> Result<String, sqlx::Error> {
    let rows = sqlx::query_scalar::<_, String>(
        &format!("EXPLAIN (ANALYZE, BUFFERS, FORMAT TEXT) {}", query)
    )
    .fetch_all(pool)
    .await?;
 
    Ok(rows.join("\n"))
}
 
// 使用例
async fn debug_slow_query(pool: &PgPool) {
    let plan = explain_query(
        pool,
        "SELECT u.*, count(p.id) FROM users u LEFT JOIN posts p ON p.user_id = u.id GROUP BY u.id"
    ).await.unwrap();
 
    println!("Query Plan:\n{}", plan);
    // Seq Scan が出ている場合はインデックスの追加を検討
    // Nested Loop が出ている場合は JOIN の最適化を検討
}

9.2 プリペアドステートメントのキャッシュ

// sqlx の query! マクロはプリペアドステートメントを自動的にキャッシュする。
// 手動でプリペアドステートメントを管理する必要がある場合:
 
use sqlx::Statement;
 
async fn cached_query_example(pool: &PgPool) -> Result<Vec<User>, sqlx::Error> {
    // プリペアドステートメントは接続ごとにキャッシュされる
    // query_as! マクロを使う場合は自動管理されるため手動管理は不要
 
    // query() の場合は persistent() で制御可能
    sqlx::query_as::<_, User>("SELECT id, name, email, created_at FROM users WHERE email = $1")
        .bind("test@example.com")
        .persistent(true)  // プリペアドステートメントをキャッシュ(デフォルト: true)
        .fetch_all(pool)
        .await
}

9.3 バッチ処理とパイプライン

/// 大量のデータを効率的にバッチ処理
async fn batch_update_status(
    pool: &PgPool,
    user_ids: &[Uuid],
    new_status: &str,
) -> Result<u64, sqlx::Error> {
    // 一度に更新する数を制限してデッドロックリスクを軽減
    const BATCH_SIZE: usize = 1000;
    let mut total_affected = 0u64;
 
    for chunk in user_ids.chunks(BATCH_SIZE) {
        let result = sqlx::query!(
            "UPDATE users SET status = $1 WHERE id = ANY($2)",
            new_status,
            chunk,
        )
        .execute(pool)
        .await?;
 
        total_affected += result.rows_affected();
    }
 
    Ok(total_affected)
}
 
/// COPY プロトコルによる超高速バルクロード(PostgreSQL 固有)
async fn bulk_load_with_copy(
    pool: &PgPool,
    csv_data: &str,
) -> Result<u64, sqlx::Error> {
    let mut conn = pool.acquire().await?;
 
    let copy_result = sqlx::query(
        "COPY users (name, email) FROM STDIN WITH (FORMAT CSV, HEADER true)"
    )
    .execute(&mut *conn)
    .await?;
 
    Ok(copy_result.rows_affected())
}

9.4 読み取りレプリカの活用

use sqlx::PgPool;
 
/// 読み取り/書き込みの分離パターン
struct DatabasePools {
    writer: PgPool,   // プライマリ(書き込み用)
    reader: PgPool,   // レプリカ(読み取り用)
}
 
impl DatabasePools {
    async fn new(
        writer_url: &str,
        reader_url: &str,
    ) -> Result<Self, sqlx::Error> {
        let writer = PgPoolOptions::new()
            .max_connections(10)
            .connect(writer_url)
            .await?;
 
        let reader = PgPoolOptions::new()
            .max_connections(20)  // 読み取りの方が多いため大きく設定
            .connect(reader_url)
            .await?;
 
        Ok(Self { writer, reader })
    }
 
    /// 書き込み操作はプライマリに送信
    fn writer(&self) -> &PgPool {
        &self.writer
    }
 
    /// 読み取り操作はレプリカに送信
    fn reader(&self) -> &PgPool {
        &self.reader
    }
}
 
// Axum での使用例
async fn list_users_handler(
    State(pools): State<DatabasePools>,
) -> Json<Vec<User>> {
    let users = sqlx::query_as!(User, "SELECT * FROM users LIMIT 50")
        .fetch_all(pools.reader())  // レプリカから読み取り
        .await
        .unwrap();
    Json(users)
}
 
async fn create_user_handler(
    State(pools): State<DatabasePools>,
    Json(payload): Json<CreateUserRequest>,
) -> Json<User> {
    let user = create_user(pools.writer(), &payload.name, &payload.email)  // プライマリに書き込み
        .await
        .unwrap();
    Json(user)
}

10. エラーハンドリングのベストプラクティス

10.1 カスタムエラー型の定義

use thiserror::Error;
 
#[derive(Debug, Error)]
pub enum AppError {
    #[error("Database error: {0}")]
    Database(#[from] sqlx::Error),
 
    #[error("Record not found: {entity} with id {id}")]
    NotFound { entity: String, id: String },
 
    #[error("Duplicate entry: {field} = {value}")]
    DuplicateEntry { field: String, value: String },
 
    #[error("Constraint violation: {0}")]
    ConstraintViolation(String),
 
    #[error("Connection pool exhausted")]
    PoolExhausted,
}
 
impl From<sqlx::Error> for AppError {
    fn from(err: sqlx::Error) -> Self {
        match &err {
            sqlx::Error::RowNotFound => AppError::NotFound {
                entity: "unknown".to_string(),
                id: "unknown".to_string(),
            },
            sqlx::Error::Database(db_err) => {
                // PostgreSQL エラーコードに基づく分類
                if let Some(code) = db_err.code() {
                    match code.as_ref() {
                        "23505" => AppError::DuplicateEntry {
                            field: db_err.constraint()
                                .unwrap_or("unknown")
                                .to_string(),
                            value: "unknown".to_string(),
                        },
                        "23503" => AppError::ConstraintViolation(
                            db_err.message().to_string()
                        ),
                        _ => AppError::Database(err),
                    }
                } else {
                    AppError::Database(err)
                }
            }
            sqlx::Error::PoolTimedOut => AppError::PoolExhausted,
            _ => AppError::Database(err),
        }
    }
}

10.2 リトライロジック

use std::time::Duration;
use tokio::time::sleep;
 
/// デッドロックやタイムアウト時の自動リトライ
async fn with_retry<F, Fut, T>(
    max_retries: u32,
    base_delay: Duration,
    f: F,
) -> Result<T, sqlx::Error>
where
    F: Fn() -> Fut,
    Fut: std::future::Future<Output = Result<T, sqlx::Error>>,
{
    let mut retries = 0;
 
    loop {
        match f().await {
            Ok(result) => return Ok(result),
            Err(err) if is_retryable(&err) && retries < max_retries => {
                retries += 1;
                let delay = base_delay * 2u32.pow(retries - 1);  // 指数バックオフ
                let jitter = Duration::from_millis(rand::random::<u64>() % 100);
                tracing::warn!(
                    retry = retries,
                    max_retries = max_retries,
                    "Retryable database error, waiting {:?}",
                    delay + jitter
                );
                sleep(delay + jitter).await;
            }
            Err(err) => return Err(err),
        }
    }
}
 
/// リトライ可能なエラーかどうかを判定
fn is_retryable(err: &sqlx::Error) -> bool {
    match err {
        sqlx::Error::PoolTimedOut => true,
        sqlx::Error::Database(db_err) => {
            if let Some(code) = db_err.code() {
                matches!(
                    code.as_ref(),
                    "40001"   // serialization_failure
                    | "40P01" // deadlock_detected
                    | "57P03" // cannot_connect_now
                    | "08006" // connection_failure
                )
            } else {
                false
            }
        }
        sqlx::Error::Io(_) => true,  // ネットワークエラー
        _ => false,
    }
}
 
// 使用例
async fn reliable_transfer(
    pool: &PgPool,
    from: Uuid,
    to: Uuid,
    amount: i64,
) -> Result<(), sqlx::Error> {
    with_retry(3, Duration::from_millis(100), || async {
        let mut tx = pool.begin().await?;
 
        sqlx::query!(
            "UPDATE accounts SET balance = balance - $1 WHERE id = $2",
            amount, from
        ).execute(&mut *tx).await?;
 
        sqlx::query!(
            "UPDATE accounts SET balance = balance + $1 WHERE id = $2",
            amount, to
        ).execute(&mut *tx).await?;
 
        tx.commit().await
    }).await
}

11. 実践的なリポジトリパターン

11.1 リポジトリトレイトの定義

use async_trait::async_trait;
 
/// リポジトリパターン — DB 実装の抽象化
#[async_trait]
pub trait UserRepository: Send + Sync {
    async fn find_by_id(&self, id: Uuid) -> Result<Option<User>, AppError>;
    async fn find_by_email(&self, email: &str) -> Result<Option<User>, AppError>;
    async fn list(&self, page: i64, per_page: i64) -> Result<(Vec<User>, i64), AppError>;
    async fn create(&self, name: &str, email: &str) -> Result<User, AppError>;
    async fn update(&self, id: Uuid, name: &str, email: &str) -> Result<User, AppError>;
    async fn delete(&self, id: Uuid) -> Result<bool, AppError>;
}
 
/// sqlx による実装
pub struct PgUserRepository {
    pool: PgPool,
}
 
impl PgUserRepository {
    pub fn new(pool: PgPool) -> Self {
        Self { pool }
    }
}
 
#[async_trait]
impl UserRepository for PgUserRepository {
    async fn find_by_id(&self, id: Uuid) -> Result<Option<User>, AppError> {
        let user = sqlx::query_as!(
            User,
            "SELECT id, name, email, created_at FROM users WHERE id = $1",
            id,
        )
        .fetch_optional(&self.pool)
        .await?;
 
        Ok(user)
    }
 
    async fn find_by_email(&self, email: &str) -> Result<Option<User>, AppError> {
        let user = sqlx::query_as!(
            User,
            "SELECT id, name, email, created_at FROM users WHERE email = $1",
            email,
        )
        .fetch_optional(&self.pool)
        .await?;
 
        Ok(user)
    }
 
    async fn list(&self, page: i64, per_page: i64) -> Result<(Vec<User>, i64), AppError> {
        let total = sqlx::query_scalar!("SELECT COUNT(*) FROM users")
            .fetch_one(&self.pool)
            .await?
            .unwrap_or(0);
 
        let users = sqlx::query_as!(
            User,
            "SELECT id, name, email, created_at FROM users ORDER BY created_at DESC LIMIT $1 OFFSET $2",
            per_page,
            (page - 1) * per_page,
        )
        .fetch_all(&self.pool)
        .await?;
 
        Ok((users, total))
    }
 
    async fn create(&self, name: &str, email: &str) -> Result<User, AppError> {
        let user = sqlx::query_as!(
            User,
            r#"
            INSERT INTO users (id, name, email, created_at)
            VALUES ($1, $2, $3, NOW())
            RETURNING id, name, email, created_at
            "#,
            Uuid::new_v4(),
            name,
            email,
        )
        .fetch_one(&self.pool)
        .await?;
 
        Ok(user)
    }
 
    async fn update(&self, id: Uuid, name: &str, email: &str) -> Result<User, AppError> {
        let user = sqlx::query_as!(
            User,
            r#"
            UPDATE users SET name = $1, email = $2, updated_at = NOW()
            WHERE id = $3
            RETURNING id, name, email, created_at
            "#,
            name,
            email,
            id,
        )
        .fetch_optional(&self.pool)
        .await?
        .ok_or(AppError::NotFound {
            entity: "User".to_string(),
            id: id.to_string(),
        })?;
 
        Ok(user)
    }
 
    async fn delete(&self, id: Uuid) -> Result<bool, AppError> {
        let result = sqlx::query!("DELETE FROM users WHERE id = $1", id)
            .execute(&self.pool)
            .await?;
 
        Ok(result.rows_affected() > 0)
    }
}

11.2 テスト用モックリポジトリ

use std::sync::{Arc, Mutex};
use std::collections::HashMap;
 
/// テスト用のインメモリリポジトリ
pub struct MockUserRepository {
    users: Arc<Mutex<HashMap<Uuid, User>>>,
}
 
impl MockUserRepository {
    pub fn new() -> Self {
        Self {
            users: Arc::new(Mutex::new(HashMap::new())),
        }
    }
}
 
#[async_trait]
impl UserRepository for MockUserRepository {
    async fn find_by_id(&self, id: Uuid) -> Result<Option<User>, AppError> {
        let users = self.users.lock().unwrap();
        Ok(users.get(&id).cloned())
    }
 
    async fn find_by_email(&self, email: &str) -> Result<Option<User>, AppError> {
        let users = self.users.lock().unwrap();
        Ok(users.values().find(|u| u.email == email).cloned())
    }
 
    async fn list(&self, page: i64, per_page: i64) -> Result<(Vec<User>, i64), AppError> {
        let users = self.users.lock().unwrap();
        let total = users.len() as i64;
        let offset = ((page - 1) * per_page) as usize;
        let limit = per_page as usize;
 
        let mut sorted: Vec<User> = users.values().cloned().collect();
        sorted.sort_by(|a, b| b.created_at.cmp(&a.created_at));
 
        let page_users = sorted.into_iter().skip(offset).take(limit).collect();
        Ok((page_users, total))
    }
 
    async fn create(&self, name: &str, email: &str) -> Result<User, AppError> {
        let mut users = self.users.lock().unwrap();
        let user = User {
            id: Uuid::new_v4(),
            name: name.to_string(),
            email: email.to_string(),
            created_at: chrono::Utc::now(),
        };
        users.insert(user.id, user.clone());
        Ok(user)
    }
 
    async fn update(&self, id: Uuid, name: &str, email: &str) -> Result<User, AppError> {
        let mut users = self.users.lock().unwrap();
        let user = users.get_mut(&id)
            .ok_or(AppError::NotFound {
                entity: "User".to_string(),
                id: id.to_string(),
            })?;
 
        user.name = name.to_string();
        user.email = email.to_string();
        Ok(user.clone())
    }
 
    async fn delete(&self, id: Uuid) -> Result<bool, AppError> {
        let mut users = self.users.lock().unwrap();
        Ok(users.remove(&id).is_some())
    }
}
 
// テストでの使用例
#[tokio::test]
async fn test_user_service_with_mock() {
    let repo = MockUserRepository::new();
    let service = UserService::new(Arc::new(repo));
 
    let user = service.register("Alice", "alice@example.com").await.unwrap();
    assert_eq!(user.name, "Alice");
 
    let found = service.find_by_email("alice@example.com").await.unwrap();
    assert!(found.is_some());
}

12. FAQ

Q1. sqlx の query! マクロはどうやってコンパイル時に SQL を検証している?

A. query! マクロはコンパイル時に DATABASE_URL 環境変数で指定された実際のデータベースに接続し、PREPARE ステートメントを使って SQL の構文とカラム型を検証する。CI/CD ではオフラインモード(sqlx prepare で生成した .sqlx/ ディレクトリ)を使うことで DB 接続なしでビルド可能。

# オフライン用のクエリメタデータ生成
cargo sqlx prepare
# → .sqlx/ ディレクトリにメタデータが保存される(Git にコミット)

Q2. diesel と sqlx を同じプロジェクトで併用できる?

A. 技術的には可能だが推奨しない。依存関係が増え、接続プールの管理が複雑になる。複雑なクエリだけ生 SQL を使いたい場合、diesel の sql_query() や sqlx のように生 SQL を書ける機能で対応する方が良い。

Q3. Web フレームワーク(Axum/Actix Web)との統合はどうする?

A. Axum の場合、State でプールを共有するのが一般的。

use axum::{extract::State, routing::get, Router, Json};
use sqlx::PgPool;
 
async fn list_users(State(pool): State<PgPool>) -> Json<Vec<User>> {
    let users = sqlx::query_as!(User, "SELECT * FROM users LIMIT 50")
        .fetch_all(&pool)
        .await
        .unwrap();
    Json(users)
}
 
#[tokio::main]
async fn main() {
    let pool = PgPool::connect("postgres://...").await.unwrap();
 
    let app = Router::new()
        .route("/users", get(list_users))
        .with_state(pool);
 
    let listener = tokio::net::TcpListener::bind("0.0.0.0:3000").await.unwrap();
    axum::serve(listener, app).await.unwrap();
}

Q4. マイグレーションの本番運用はどうする?

A. マイグレーションはアプリケーション起動時に自動実行するか、CI/CD パイプラインで実行する。

// アプリケーション起動時にマイグレーションを実行
#[tokio::main]
async fn main() {
    let pool = PgPool::connect("postgres://...").await.unwrap();
 
    // マイグレーションの実行
    sqlx::migrate!("./migrations")
        .run(&pool)
        .await
        .expect("Failed to run migrations");
 
    // アプリケーションの起動
    start_server(pool).await;
}

本番環境では以下の点に注意する:

  • ゼロダウンタイム: ALTER TABLE はロックを取得するため、大きなテーブルでは ALTER TABLE ... ADD COLUMN ... DEFAULT ... を使う(PostgreSQL 11+ では即座に完了)
  • ロールバック計画: 各マイグレーションに対応するロールバック SQL を用意する
  • テスト: ステージング環境で事前にマイグレーションを検証する

Q5. 大量データのページネーションで OFFSET が遅い場合は?

A. OFFSET は指定された行数分をスキャンするため、大きなオフセットでは遅くなる。Cursor-based ページネーションを使う。

/// Cursor-based ページネーション(OFFSET を使わない)
async fn list_users_cursor(
    pool: &PgPool,
    cursor: Option<DateTime<Utc>>,  // 前ページの最後の created_at
    limit: i64,
) -> Result<Vec<User>, sqlx::Error> {
    match cursor {
        Some(after) => {
            sqlx::query_as!(
                User,
                r#"
                SELECT id, name, email, created_at
                FROM users
                WHERE created_at < $1
                ORDER BY created_at DESC
                LIMIT $2
                "#,
                after,
                limit,
            )
            .fetch_all(pool)
            .await
        }
        None => {
            sqlx::query_as!(
                User,
                r#"
                SELECT id, name, email, created_at
                FROM users
                ORDER BY created_at DESC
                LIMIT $1
                "#,
                limit,
            )
            .fetch_all(pool)
            .await
        }
    }
}

Q6. 複数データベースへの対応(マルチテナント)は?

A. マルチテナントの実装方法はいくつかある。

/// スキーマベースのマルチテナント(PostgreSQL)
async fn with_tenant_schema(
    pool: &PgPool,
    tenant_id: &str,
) -> Result<Vec<User>, sqlx::Error> {
    let mut conn = pool.acquire().await?;
 
    // テナントのスキーマに切り替え
    sqlx::query(&format!("SET search_path TO tenant_{}, public", tenant_id))
        .execute(&mut *conn)
        .await?;
 
    // 以降のクエリはテナントのスキーマで実行される
    sqlx::query_as!(User, "SELECT id, name, email, created_at FROM users")
        .fetch_all(&mut *conn)
        .await
}
 
/// データベース分離型マルチテナント
use std::collections::HashMap;
use std::sync::Arc;
use tokio::sync::RwLock;
 
struct TenantPools {
    pools: Arc<RwLock<HashMap<String, PgPool>>>,
}
 
impl TenantPools {
    async fn get_pool(&self, tenant_id: &str) -> Option<PgPool> {
        let pools = self.pools.read().await;
        pools.get(tenant_id).cloned()
    }
 
    async fn add_tenant(&self, tenant_id: &str, database_url: &str) -> Result<(), sqlx::Error> {
        let pool = PgPool::connect(database_url).await?;
        let mut pools = self.pools.write().await;
        pools.insert(tenant_id.to_string(), pool);
        Ok(())
    }
}

FAQ

Q1: このトピックを学ぶ上で最も重要なポイントは何ですか?

実践的な経験を積むことが最も重要です。理論だけでなく、実際にコードを書いて動作を確認することで理解が深まります。

Q2: 初心者がよく陥る間違いは何ですか?

基礎を飛ばして応用に進むことです。このガイドで説明している基本概念をしっかり理解してから、次のステップに進むことをお勧めします。

Q3: 実務ではどのように活用されていますか?

このトピックの知識は、日常的な開発業務で頻繁に活用されます。特にコードレビューやアーキテクチャ設計の際に重要になります。


13. まとめ

項目 ポイント
sqlx 生 SQL 派向け、コンパイル時検証、非同期ネイティブ
diesel 型安全 DSL、コンパイル時保証最強、同期メイン(async 拡張あり)
SeaORM ActiveRecord パターン、Rails 的な開発体験、非同期ネイティブ
選定基準 SQL 制御 → sqlx、型安全 → diesel、生産性 → SeaORM
接続プール 適切なサイズ設定、監視、タイムアウト設定が重要
テスト トランザクションベースのテスト、リポジトリパターンでモック可能に
エラーハンドリング PostgreSQL エラーコードの分類、リトライロジックの実装
パフォーマンス N+1 回避、Cursor-based ページネーション、EXPLAIN ANALYZE 活用
共通注意点 N+1 回避、トランザクション活用、接続プールの適切な設定

次に読むべきガイド

  • Rust 非同期プログラミングガイド — tokio ランタイムとの統合
  • Axum Web フレームワークガイド — DB 層との接続パターン
  • SQL パフォーマンスチューニング — インデックス設計とクエリ最適化

参考文献

  1. sqlx 公式リポジトリhttps://github.com/launchbadge/sqlx
  2. diesel 公式サイト — "Getting Started with Diesel" — https://diesel.rs/guides/getting-started
  3. SeaORM 公式ドキュメントhttps://www.sea-ql.org/SeaORM/docs/introduction/
  4. The Rust Programming Language — "Async/Await" — https://doc.rust-lang.org/book/
  5. diesel-asynchttps://github.com/weiznich/diesel_async
  6. PostgreSQL 公式ドキュメント — "Connection Pooling" — https://www.postgresql.org/docs/current/runtime-config-connection.html
  7. Use The Index, Luke — SQL インデックス設計ガイド — https://use-the-index-luke.com/