Skip to content

File Watching

Online: https://thekevinscott.github.io/dirsql/guide/watching

dirsql can monitor the filesystem for changes and emit events when rows are inserted, updated, or deleted. This is useful for building reactive applications that respond to file changes in real time.

From the CLI

The dirsql HTTP server streams the same events over GET /events as a Server-Sent Events stream. Each data: payload uses the same JSON schema described in Event types below. See the CLI section for the full server setup.

Starting a watch stream

python
from dirsql import DirSQL, Table
import json

db = DirSQL(
    "./my-project",
    tables=[
        Table(
            ddl="CREATE TABLE comments (id TEXT, body TEXT, author TEXT)",
            glob="comments/**/*.json",
            extract=lambda path: [json.loads(open(path, encoding="utf-8").read())],
        ),
    ],
)

async for event in db.watch():
    print(f"{event.action} on {event.table}: {event.row}")
rust
use dirsql::{DirSQL, RowEvent, Table, Value};
use futures::StreamExt;
use std::collections::HashMap;

// See `row_from_json` in getting-started.md for a reusable helper.
fn row_from_json(raw: &str) -> HashMap<String, Value> {
    let v: serde_json::Value = serde_json::from_str(raw).unwrap();
    let serde_json::Value::Object(obj) = v else { return HashMap::new() };
    obj.into_iter()
        .map(|(k, val)| {
            let v = match val {
                serde_json::Value::String(s) => Value::Text(s),
                serde_json::Value::Number(n) => n
                    .as_i64()
                    .map(Value::Integer)
                    .unwrap_or_else(|| Value::Real(n.as_f64().unwrap_or(0.0))),
                serde_json::Value::Bool(b) => Value::Integer(b as i64),
                serde_json::Value::Null => Value::Null,
                other => Value::Text(other.to_string()),
            };
            (k, v)
        })
        .collect()
}

let db = DirSQL::new(
    "./my-project",
    vec![
        Table::new(
            "CREATE TABLE comments (id TEXT, body TEXT, author TEXT)",
            "comments/**/*.json",
            |path| vec![row_from_json(&std::fs::read_to_string(path).unwrap())],
        ),
    ],
)?;

let mut stream = db.watch()?;
while let Some(event) = stream.next().await {
    match event {
        RowEvent::Insert { table, row, file_path } => {
            println!("insert on {table} ({file_path}): {row:?}")
        }
        RowEvent::Update { table, old_row, new_row, file_path } => {
            println!("update on {table} ({file_path}): {old_row:?} -> {new_row:?}")
        }
        RowEvent::Delete { table, row, file_path } => {
            println!("delete on {table} ({file_path}): {row:?}")
        }
        RowEvent::Error { file_path, error } => {
            println!("error on {file_path:?}: {error}")
        }
    }
}
typescript
import { readFileSync } from 'node:fs';
import { DirSQL, type TableDef } from 'dirsql';

const tables: TableDef[] = [
  {
    ddl: 'CREATE TABLE comments (id TEXT, body TEXT, author TEXT)',
    glob: 'comments/**/*.json',
    extract: (path) => [JSON.parse(readFileSync(path, 'utf8'))],
  },
];

const db = new DirSQL({ root: './my-project', tables });

for await (const event of db.watch()) {
  console.log(`${event.action} on ${event.table}:`, event.row);
}

See Async API for full details on the async DirSQL API (Python).

Event types

Each event is a RowEvent object with these attributes:

insert

A new row was added. This happens when a new file is created or an existing file gains additional rows.

python
event.action   # "insert"
event.table    # "comments"
event.row      # {"id": "abc", "body": "new comment", "author": "alice"}
event.old_row  # None
event.file_path # "comments/abc/index.json"
rust
// RowEvent is an enum; match on the variant to destructure its fields.
RowEvent::Insert {
    table,     // "comments"
    row,       // {"id": "abc", "body": "new comment", "author": "alice"}
    file_path, // "comments/abc/index.json"
} => { /* ... */ }
typescript
event.action   // 'insert'
event.table    // 'comments'
event.row      // { id: 'abc', body: 'new comment', author: 'alice' }
event.oldRow   // undefined
event.filePath // 'comments/abc/index.json'

update

An existing row was modified. dirsql diffs the old and new rows extracted from the file to detect changes.

python
event.action   # "update"
event.table    # "comments"
event.row      # {"id": "abc", "body": "edited comment", "author": "alice"}
event.old_row  # {"id": "abc", "body": "original comment", "author": "alice"}
event.file_path # "comments/abc/index.json"
rust
RowEvent::Update {
    table,     // "comments"
    old_row,   // {"id": "abc", "body": "original comment", "author": "alice"}
    new_row,   // {"id": "abc", "body": "edited comment", "author": "alice"}
    file_path, // "comments/abc/index.json"
} => { /* ... */ }
typescript
event.action   // 'update'
event.table    // 'comments'
event.row      // { id: 'abc', body: 'edited comment', author: 'alice' }
event.oldRow   // { id: 'abc', body: 'original comment', author: 'alice' }
event.filePath // 'comments/abc/index.json'

delete

A row was removed. This happens when a file is deleted or a file is modified to contain fewer rows.

python
event.action   # "delete"
event.table    # "comments"
event.row      # {"id": "abc", "body": "deleted comment", "author": "alice"}
event.old_row  # None
event.file_path # "comments/abc/index.json"
rust
RowEvent::Delete {
    table,     // "comments"
    row,       // {"id": "abc", "body": "deleted comment", "author": "alice"}
    file_path, // "comments/abc/index.json"
} => { /* ... */ }
typescript
event.action   // 'delete'
event.table    // 'comments'
event.row      // { id: 'abc', body: 'deleted comment', author: 'alice' }
event.oldRow   // undefined
event.filePath // 'comments/abc/index.json'

error

An error occurred while processing a file change. The file was modified but the extract function failed, or the file could not be read.

python
event.action    # "error"
event.table     # "comments" (or None if the error isn't tied to a table)
event.error     # "Extract error: ..."
event.file_path # "comments/abc/index.json"
event.row       # None
rust
// `table` is `Option<String>`: `Some("comments")` when the failing
// file matched a table's glob; `None` for errors that aren't tied
// to a specific table (e.g. a watch-channel failure).
RowEvent::Error {
    table,     // Some("comments")
    file_path, // PathBuf, e.g. "comments/abc/index.json"
    error,     // "Extract error: ..."
} => { /* ... */ }
typescript
event.action   // 'error'
event.table    // 'comments' (or null if the error isn't tied to a table)
event.error    // 'Extract error: ...'
event.filePath // 'comments/abc/index.json'
event.row      // undefined

How diffing works

When a file changes, dirsql:

  1. Re-reads the file and calls the extract function to get new rows
  2. Compares new rows against the previously extracted rows for that file
  3. Emits insert, update, and delete events based on the diff
  4. Updates the in-memory database to reflect the new state

Row identity is determined by position (row index within the file). If a file previously produced 3 rows and now produces 2, the first two rows are compared for updates and the third is emitted as a delete.

Filesystem events

Under the hood, dirsql uses the notify crate (inotify on Linux, FSEvents on macOS, ReadDirectoryChangesW on Windows) to receive filesystem events. Events are coalesced and filtered through the table matcher before being processed.

Files that do not match any table glob or that match an ignore pattern are silently skipped.

Released under the MIT License.