File Watching
Online: https://thekevinscott.github.io/dirsql/guide/watching
dirsql can monitor the filesystem for changes and emit events when rows are inserted, updated, or deleted. This is useful for building reactive applications that respond to file changes in real time.
From the CLI
The dirsql HTTP server streams the same events over GET /events as a Server-Sent Events stream. Each data: payload uses the same JSON schema described in Event types below. See the CLI section for the full server setup.
Starting a watch stream
from dirsql import DirSQL, Table
import json
db = DirSQL(
"./my-project",
tables=[
Table(
ddl="CREATE TABLE comments (id TEXT, body TEXT, author TEXT)",
glob="comments/**/*.json",
extract=lambda path: [json.loads(open(path, encoding="utf-8").read())],
),
],
)
async for event in db.watch():
print(f"{event.action} on {event.table}: {event.row}")use dirsql::{DirSQL, RowEvent, Table, Value};
use futures::StreamExt;
use std::collections::HashMap;
// See `row_from_json` in getting-started.md for a reusable helper.
fn row_from_json(raw: &str) -> HashMap<String, Value> {
let v: serde_json::Value = serde_json::from_str(raw).unwrap();
let serde_json::Value::Object(obj) = v else { return HashMap::new() };
obj.into_iter()
.map(|(k, val)| {
let v = match val {
serde_json::Value::String(s) => Value::Text(s),
serde_json::Value::Number(n) => n
.as_i64()
.map(Value::Integer)
.unwrap_or_else(|| Value::Real(n.as_f64().unwrap_or(0.0))),
serde_json::Value::Bool(b) => Value::Integer(b as i64),
serde_json::Value::Null => Value::Null,
other => Value::Text(other.to_string()),
};
(k, v)
})
.collect()
}
let db = DirSQL::new(
"./my-project",
vec![
Table::new(
"CREATE TABLE comments (id TEXT, body TEXT, author TEXT)",
"comments/**/*.json",
|path| vec![row_from_json(&std::fs::read_to_string(path).unwrap())],
),
],
)?;
let mut stream = db.watch()?;
while let Some(event) = stream.next().await {
match event {
RowEvent::Insert { table, row, file_path } => {
println!("insert on {table} ({file_path}): {row:?}")
}
RowEvent::Update { table, old_row, new_row, file_path } => {
println!("update on {table} ({file_path}): {old_row:?} -> {new_row:?}")
}
RowEvent::Delete { table, row, file_path } => {
println!("delete on {table} ({file_path}): {row:?}")
}
RowEvent::Error { file_path, error } => {
println!("error on {file_path:?}: {error}")
}
}
}import { readFileSync } from 'node:fs';
import { DirSQL, type TableDef } from 'dirsql';
const tables: TableDef[] = [
{
ddl: 'CREATE TABLE comments (id TEXT, body TEXT, author TEXT)',
glob: 'comments/**/*.json',
extract: (path) => [JSON.parse(readFileSync(path, 'utf8'))],
},
];
const db = new DirSQL({ root: './my-project', tables });
for await (const event of db.watch()) {
console.log(`${event.action} on ${event.table}:`, event.row);
}See Async API for full details on the async DirSQL API (Python).
Event types
Each event is a RowEvent object with these attributes:
insert
A new row was added. This happens when a new file is created or an existing file gains additional rows.
event.action # "insert"
event.table # "comments"
event.row # {"id": "abc", "body": "new comment", "author": "alice"}
event.old_row # None
event.file_path # "comments/abc/index.json"// RowEvent is an enum; match on the variant to destructure its fields.
RowEvent::Insert {
table, // "comments"
row, // {"id": "abc", "body": "new comment", "author": "alice"}
file_path, // "comments/abc/index.json"
} => { /* ... */ }event.action // 'insert'
event.table // 'comments'
event.row // { id: 'abc', body: 'new comment', author: 'alice' }
event.oldRow // undefined
event.filePath // 'comments/abc/index.json'update
An existing row was modified. dirsql diffs the old and new rows extracted from the file to detect changes.
event.action # "update"
event.table # "comments"
event.row # {"id": "abc", "body": "edited comment", "author": "alice"}
event.old_row # {"id": "abc", "body": "original comment", "author": "alice"}
event.file_path # "comments/abc/index.json"RowEvent::Update {
table, // "comments"
old_row, // {"id": "abc", "body": "original comment", "author": "alice"}
new_row, // {"id": "abc", "body": "edited comment", "author": "alice"}
file_path, // "comments/abc/index.json"
} => { /* ... */ }event.action // 'update'
event.table // 'comments'
event.row // { id: 'abc', body: 'edited comment', author: 'alice' }
event.oldRow // { id: 'abc', body: 'original comment', author: 'alice' }
event.filePath // 'comments/abc/index.json'delete
A row was removed. This happens when a file is deleted or a file is modified to contain fewer rows.
event.action # "delete"
event.table # "comments"
event.row # {"id": "abc", "body": "deleted comment", "author": "alice"}
event.old_row # None
event.file_path # "comments/abc/index.json"RowEvent::Delete {
table, // "comments"
row, // {"id": "abc", "body": "deleted comment", "author": "alice"}
file_path, // "comments/abc/index.json"
} => { /* ... */ }event.action // 'delete'
event.table // 'comments'
event.row // { id: 'abc', body: 'deleted comment', author: 'alice' }
event.oldRow // undefined
event.filePath // 'comments/abc/index.json'error
An error occurred while processing a file change. The file was modified but the extract function failed, or the file could not be read.
event.action # "error"
event.table # "comments" (or None if the error isn't tied to a table)
event.error # "Extract error: ..."
event.file_path # "comments/abc/index.json"
event.row # None// `table` is `Option<String>`: `Some("comments")` when the failing
// file matched a table's glob; `None` for errors that aren't tied
// to a specific table (e.g. a watch-channel failure).
RowEvent::Error {
table, // Some("comments")
file_path, // PathBuf, e.g. "comments/abc/index.json"
error, // "Extract error: ..."
} => { /* ... */ }event.action // 'error'
event.table // 'comments' (or null if the error isn't tied to a table)
event.error // 'Extract error: ...'
event.filePath // 'comments/abc/index.json'
event.row // undefinedHow diffing works
When a file changes, dirsql:
- Re-reads the file and calls the extract function to get new rows
- Compares new rows against the previously extracted rows for that file
- Emits insert, update, and delete events based on the diff
- Updates the in-memory database to reflect the new state
Row identity is determined by position (row index within the file). If a file previously produced 3 rows and now produces 2, the first two rows are compared for updates and the third is emitted as a delete.
Filesystem events
Under the hood, dirsql uses the notify crate (inotify on Linux, FSEvents on macOS, ReadDirectoryChangesW on Windows) to receive filesystem events. Events are coalesced and filtered through the table matcher before being processed.
Files that do not match any table glob or that match an ignore pattern are silently skipped.