EDRSR: Data Pipeline for 60 Million Court Decisions
60 million full texts. 283 GB across 4 shards. Custom RTF parser with depth-tracking for Windows-1251 Cyrillic. Two-phase ETL with idempotent upsert via temp tables. Application-level sharding by doc_id with independent backup domains. PostgreSQL shared memory exhaustion and three layers of defense. All on open government data.
EDRSR: Data Pipeline for 60 Million Court Decisions
Architecture of an ETL system that transfers the entire Unified State Register of Court Decisions into a 4-shard PostgreSQL infrastructure – from data modeling and RTF parsing to capacity planning and operational trade-offs.
Problem Context
LEX AI is a semantic search platform for court practice. The search core relies on vector embeddings (text-embedding-ada-002, 1536 dim) generated from full decision texts. No text means no embeddings; no embeddings means no semantic search.
EDRSR (Unified State Register of Court Decisions) contains ~60M documents from 685 courts across all instances, from 2006 to present. Full texts are stored in RTF format with Windows-1251 encoding.
Scale:
| Parameter | Value | |———–|——-| | Documents in registry | ~60,000,000 | | Average RTF size | ~4.5 KB | | Average plaintext size | ~2.3 KB | | Total text volume | 283 GB (PostgreSQL) | | Source courts | 685 | | Time range | 2006–2026 |
A Principled Decision: Open Data Only
We deliberately chose to work exclusively with open sources. The portal reyestr.court.gov.ua publishes court decisions in open access – this is public information under Ukrainian law on access to public information.
The reason is not just ethical. Commercial APIs carry operational risks: rate limits, token blocking during bulk downloads, third-party dependency. A specific incident: bulk downloading court_sessions (~35K requests in 2.7 hours) got both ZakonOnline API tokens blocked, taking down the production chat.
| Source | What We Get | Access Model | |——–|————|————–| | reyestr.court.gov.ua | Full texts in RTF | HTTP GET, rate-limited, free | | data.gov.ua | Metadata (CSV dumps) | Bulk download, updated daily | | Commercial APIs | Same + JSON | REST API, paid, tokens get blocked |
Data Model
Before discussing the pipeline, it is worth understanding the target schema. We separated metadata and full texts into two distinct tables – this is a key architectural decision.
Metadata: edrsr_documents
CREATE TABLE edrsr_documents (
doc_id BIGINT PRIMARY KEY, – PK from EDRSR, auto-increment
court_code INTEGER, – FK to edrsr_courts (no constraint)
judgment_code SMALLINT, – decision type (verdict, ruling, resolution)
justice_kind SMALLINT, – type of proceedings
category_code INTEGER, – case category (4,106 categories)
cause_num TEXT, – case number
adjudication_date TIMESTAMPTZ, – date of decision
receipt_date TIMESTAMPTZ, – date received by registry
judge TEXT, – judge/panel
doc_url TEXT, – RTF URL in registry
status SMALLINT DEFAULT 0,
date_publ TIMESTAMPTZ
);
Deliberate absence of FK constraints. Source data from data.gov.ua contains court_code, justice_kind, category_code values not always present in reference tables. With FK constraints, import breaks on every "dirty" row. Without them – we import everything and validate at the query level.
Why doc_id BIGINT, not UUID? doc_id is a natural key from EDRSR (auto-increment). It grows monotonically, yielding an ideal B-tree with minimal fragmentation during sequential import. UUID would cause random inserts across the entire index – at 60M rows this is a significant I/O difference.
8 indexes on common query patterns: court_code, justice_kind, judgment_code, category_code, cause_num, judge, adjudication_date, receipt_date. Each justified by a real use case (filter by court, by proceedings type, search by case number).
Full Texts: edrsr_fulltext
CREATE TABLE edrsr_fulltext (
doc_id BIGINT PRIMARY KEY, – join key to edrsr_documents
full_text TEXT, – plaintext after RTF conversion
text_length INTEGER, – pre-computed for filtering
created_at TIMESTAMP DEFAULT NOW()
);
Why a separate table, not a column in edrsr_documents? Three reasons:
TOAST segmentation. PostgreSQL stores TEXT > 2 KB in separate TOAST pages. If full_text lives in the same table as metadata, then
SELECT court_code, cause_num FROM edrsr_documentswill still touch TOAST pages during sequential scan. Separate table = clean sequential scan on metadata without overhead.Different lifecycles. Metadata is imported from data.gov.ua CSV dumps (daily updates). Full texts are downloaded from reyestr.court.gov.ua (one-time bulk + incremental). Different sources, different scripts, different frequencies.
Independent sharding. Full texts occupy 283 GB vs ~12 GB for metadata. Only texts need sharding; metadata stays in one database.
Reference Tables
5 reference tables: courts (685), instances (3), regions (27), justice_kinds (5), judgment_forms (10+), cause_categories (4,106). Imported once, rarely updated.
Pipeline Architecture
The pipeline is implemented as 4 independent Python scripts. Each is idempotent – can be restarted without data loss or duplicates.
┌─────────────────────┐ ┌──────────────────────┐ ┌──────────────────┐ ┌──────────────────────┐
│ 1. Download RTF │ │ 2. Import from HDD │ │ 3. Monitor │ │ 4. Copy to Prod │
│ │ │ │ │ │ │ │
│ asyncio + aiohttp │───▶│ multiprocessing │───▶│ PG aggregate │───▶│ 2-phase ETL │
│ 100 workers │ │ 12 CPU workers │ │ + in-mem cache │ │ 200 psql workers │
│ 3 retries + backoff│ │ COPY FROM STDIN │ │ cross-env stats │ │ TSV chunks on NVMe │
│ │ │ ON CONFLICT NOTHING │ │ │ │ ON CONFLICT NOTHING │
│ reyestr.court.gov │ │ HDD → PG local │ │ local/stage/prod│ │ PG local → PG prod │
│ → /tmp/edrsr-rtf/ │ │ 18 TB /dev/sda1 │ │ │ │ per-shard routing │
└─────────────────────┘ └──────────────────────┘ └──────────────────┘ └──────────────────────┘
Stage 1: RTF Download
I/O model: async HTTP GET → disk write. Network-bound task, hence asyncio + aiohttp with TCPConnector(limit=100, limit_per_host=100).
semaphore = asyncio.Semaphore(100) # 100 concurrent downloads
# Retry: 3 attempts, exponential backoff (2s, 4s, 6s)
# 429 handling: sleep 5 * (attempt + 1) seconds
Resumability. Before downloading, we check outpath.exists() and outpath.stat().st_size > 0. If the file already exists and is non-empty – skip. This allows restarting the script without re-downloading.
File convention: {doc_id}.rtf – doc_id is the filename. This gives O(1) lookup without a metadata database: int(filename[:-4]) → doc_id.
RTF Parser: Why Custom
RTF from EDRSR is not ordinary RTF. It is Windows-1251 Cyrillic encoded as \\'XX escape sequences inside a latin1 wrapper. Standard libraries (striprtf, pyrtf-ng) do not distinguish Windows-1251 from latin1 bytes and break Cyrillic.
Our parser works in 7 steps:
1. raw bytes → latin1 decode (RTF envelope)
2. Remove nested groups: {\fonttbl ...}, {\colortbl ...},
{\stylesheet ...}, {\info ...}, {\* ...}
(depth-tracking brace parser, O(n))
3. Strip \rtf1 header
4. \par → \n, \line → \n, \tab → \t
5. \\'XX → Windows-1251 byte decode
6. \uNNNNN → chr(code), range check 0..0x10FFFF
7. Strip remaining \keyword sequences
8. Remove braces, null bytes, normalize newlines
9. UTF-8 surrogate cleanup: encode('utf-8', errors='surrogatepass')
.decode('utf-8', errors='replace')
Depth-tracking for nested groups. An RTF group {\fonttbl {\f0 Times;}} can have arbitrary nesting depth. The parser tracks {} balance and removes the entire group from opening to closing brace at the same level. Complexity O(n) by document length.
Accuracy: 99.5% on a corpus of ~1,000 manually verified documents. The 0.5% errors are documents with non-standard RTF extensions (embedded images, OLE objects) where text is still extracted but with artifacts.
Stage 2: Bulk Import from HDD
This is the pipeline's main workhorse. All RTF files reside on an 18 TB HDD (/dev/sda1), and the script must convert them to text and load into PostgreSQL.
Why multiprocessing, not asyncio? RTF conversion is CPU-bound: 7 regex substitutions, character-by-character iteration for depth-tracking, encode/decode. Python's GIL blocks parallel execution of CPU-bound code in threads. multiprocessing.Pool with 12 workers (= core count) bypasses GIL via separate processes.
Pool(processes=12, initializer=_init_worker, initargs=(rtf_lookup,))
pool.map(convert_one, batch_ids, chunksize=50)
chunksize=50: balances between IPC overhead (task transfer between processes) and granularity. At chunksize=1, IPC overhead dominates. At chunksize=1000, one slow file blocks the entire chunk.
I/O Pattern: scandir Instead of stat
On an HDD with 15M+ files, os.stat() is a bottleneck. Each stat() is a separate I/O seek on a spinning disk. At 15M files, that is ~4 hours just for stat().
# Single scandir pass – build lookup O(n)
rtf_lookup: dict[int, Path] = {}
for entry in os.scandir(rtf_dir): # readdir, no stat()
if entry.name.endswith('.rtf'):
doc_id = int(entry.name[:-4])
rtf_lookup[doc_id] = rtf_dir / entry.name
os.scandir() calls system-level readdir(), which returns filenames without stat(). This is one sequential directory read instead of 15M random seeks.
Idempotent Upsert via Temp Table
A critical pattern for any data pipeline at scale:
CREATE TEMP TABLE _ft_tmp (doc_id bigint, full_text text);
COPY _ft_tmp FROM stdin; – bulk load into temp
INSERT INTO edrsr_fulltext(doc_id, full_text)
SELECT doc_id, full_text FROM _ft_tmp
ON CONFLICT (doc_id) DO NOTHING; – idempotent: duplicates ignored
DROP TABLE _ft_tmp;
Why not direct COPY INTO edrsr_fulltext? COPY does not support ON CONFLICT. If a batch contains a doc_id that already exists, the entire COPY fails. Temp table + INSERT ON CONFLICT is a staging area with deduplication.
Why not INSERT ... ON CONFLICT DO UPDATE? DO NOTHING is cheaper: it does not generate WAL for unchanged rows. Texts do not change after initial import, so UPDATE is unnecessary.
Checking Already Imported
Before conversion, the script fetches existing doc_ids:
SELECT doc_id FROM edrsr_fulltext WHERE doc_id BETWEEN {min_id} AND {max_id};
to_import = sorted(set(rtf_lookup.keys()) - existing)
This is a set difference in Python – O(n). For 30M doc_ids this is ~2 GB memory (64 bytes per int in set), which is acceptable.
Stage 3: Monitoring and PostgreSQL Shared Memory
When importing millions of records, you need observability. We built an admin page with cross-environment aggregation:
- KPI cards: total metadata, total fulltext, coverage %
- Per-year table with progress bars
- Data from local, stage, prod (via
/api/internal/edrsr-stats) - Auto-refresh every 30 seconds
Incident: PG Error 53100
could not resize shared memory segment – No space left on device
Root cause. A query LEFT JOIN edrsr_documents (45M) x edrsr_fulltext with GROUP BY EXTRACT(YEAR FROM adjudication_date) required a hash join. PostgreSQL allocates the hash table in shared memory. With work_mem=256MB, a single such operation consumed the entire container's shm_size (Docker default: 64 MB).
Auto-refresh frontend every 30s = ~120 such queries/hr. Each one – a potential OOM on shared memory.
Three layers of defense:
1. Query decomposition. Instead of one JOIN – two separate COUNTs:
– Query 1: metadata counts
SELECT EXTRACT(YEAR FROM adjudication_date)::int AS year,
COUNT(*)::int AS total FROM edrsr_documents GROUP BY year;
– Query 2: fulltext counts
SELECT EXTRACT(YEAR FROM d.adjudication_date)::int AS year,
COUNT(f.doc_id)::int AS with_fulltext
FROM edrsr_documents d
LEFT JOIN edrsr_fulltext f ON f.doc_id = d.doc_id GROUP BY year;
Merge happens in Node.js. Each query works with a smaller hash table.
2. work_mem throttling. SET LOCAL work_mem='32MB' inside a transaction. 32 MB instead of 256 MB – 8x less pressure on shared memory. SET LOCAL resets after the transaction, does not affect other connections.
3. In-memory cache (TTL 5 min). Node.js Map with timestamp. Identical responses served from cache. 120 queries/hr → 12 queries/hr.
Safety net: shm_size: 2g in Docker Compose. Not a fix, but insurance.
Sharding Architecture: 4 Databases in One PostgreSQL
Capacity Planning
60M rows × ~4.7 KB average size (text + overhead) = ~283 GB
EC2 t3.xlarge: 4 vCPU, 16 GB RAM, EBS gp3
shared_buffers = 4 GB (25% RAM)
effective_cache_size = 12 GB
283 GB of data with 4 GB shared_buffers means a buffer hit ratio of ~1.4%. For sequential scan (VACUUM, ANALYZE), this is acceptable. For point lookups by doc_id (PK) – the B-tree index of ~2.8 GB fits in shared_buffers.
Single-database problem: pg_dump on 283 GB takes ~4 hours. If it fails at 90% – start over. VACUUM FULL on a 283 GB table requires double the disk space (566 GB). autovacuum on 60M rows with a high dead tuple ratio can run for hours.
Sharding Strategy
Application-level sharding by doc_id ranges. 4 separate databases in one PostgreSQL container:
| Shard | Database | doc_id Range | Rows | Size | Backup Time |
|——-|———-|————-|——|——|————-|
| S1 | secondlayer_prod | < 112M | ~24M | 146 GB | ~90 min |
| S2 | secondlayer_prod_ft2 | 112M–150M | ~26M | 101 GB | ~60 min |
| S3 | secondlayer_prod_ft3 | 150M–175M | ~8M | 27 GB | ~15 min |
| S4 | secondlayer_prod_ft4 | > 175M | ~2M | 8 GB | ~2 min |
Why not native partitioning? Declarative range partitions would solve the VACUUM problem (each partition is a separate heap), but NOT pg_dump: all partitions live in one database, and dump/restore operates at the database level. With separate databases – 4 independent pg_dump | pg_restore in parallel.
Why not Citus? Citus requires coordinator + workers (minimum 2 nodes) or a managed service. Our access pattern – point lookups by doc_id – does not need distributed query planning. Also, Citus does not provide independent backup domains.
Why not FDW (Foreign Data Wrappers)? We considered postgres_fdw for transparent cross-shard queries. Rejected: fdw adds latency (~2ms overhead per query), does not support pushdown for all operations, and complicates backup (fdw tables are not dumped by standard pg_dump).
Query Routing
Sharding key is doc_id (BIGINT). Monotonically increasing, so range sharding is natural:
doc_id < 112,000,000 → secondlayer_prod (S1)
112M ≤ doc_id < 150,000,000 → secondlayer_prod_ft2 (S2)
150M ≤ doc_id < 175,000,000 → secondlayer_prod_ft3 (S3)
doc_id ≥ 175,000,000 → secondlayer_prod_ft4 (S4)
The backend routes at the connection pool level: 4 PgBouncer pools, each targeting its own database. For a new shard – add database, pool, and update the range map.
Monitoring: endpoint /api/internal/edrsr-stats collects counts from all shards via pg_class.reltuples (approximate count, O(1)) instead of COUNT(*) (sequential scan, O(n)).
Trade-offs
| Aspect | Pro | Con | |——–|—–|—–| | Backup | Independent per-shard (ft4 = 2 min) | 4 separate cron jobs | | VACUUM | Parallel, smaller tables | 4 autovacuum workers | | Queries | Point lookup O(log n) | Cross-shard JOIN only in Node.js | | Connections | Isolated pools | 4× connection overhead in PgBouncer | | Ops | Can drop/rebuild one shard | Manual range management |
Copying to Production: Two-Phase ETL
Transferring 60M rows (283 GB) from local PG to 4 production shards over the network is a separate engineering challenge. The script copy-fulltext-to-prod.py implements a two-phase approach.
Phase 1: Export (sequential read → TSV chunks)
# Single streaming COPY from local PG → TSV files on NVMe
export_sql = "\\COPY (SELECT doc_id, full_text FROM edrsr_fulltext "
f"WHERE {where} ORDER BY doc_id) TO STDOUT WITH (FORMAT text)"
proc = subprocess.Popen(LOCAL_CMD + ["-c", export_sql], stdout=PIPE)
for line in proc.stdout: # streaming, no memory accumulation
current_file.write(line)
if line_count >= chunk_size: # default 5000 rows
rotate_to_next_chunk()
Why TSV, not CSV? COPY text format (TSV) is PostgreSQL's native format. No CSV parsing needed on the receiving end. Simpler escaping: tab-separated, backslash-escaping.
Why chunk files, not a pipe? Resumability. If the network drops at 70% of upload – restart picks up unsent chunks. Each chunk = atomic unit of work.
I/O pattern: Sequential read from local PG (NVMe) → sequential write to /tmp/edrsr-ft-chunks/. Single stream, no disk contention.
Phase 2: Upload (parallel workers → prod PG)
Pool(processes=200) # 200 parallel psql processes
pool.imap_unordered(upload_chunk, chunk_files, chunksize=1)
Each worker:
- Reads a TSV chunk from disk (~5,000 rows, ~25 MB)
- Constructs SQL:
CREATE TEMP TABLE→COPY FROM STDIN→INSERT ON CONFLICT→DROP TABLE - Executes via
subprocess.run(["psql", "-h", prod_host, ...]) - Parses stdout for
INSERT 0 Nto count copied rows - Deletes chunk file after success
Why psql subprocess, not psycopg2? Python GIL. 200 threads with psycopg2 would serialize on GIL when processing network buffers. 200 subprocesses are 200 separate processes, each with its own TCP connection. Full network throughput utilization.
SET lock_timeout = '5min' on each chunk – protection against deadlock during concurrent INSERTs into the same shard.
Resumability: Chunks are deleted only after a successful INSERT. –skip-export allows restarting only the upload phase from existing chunks. –resume-from-doc-id allows exporting new data and appending to existing chunks.
Progress: every 200 chunks: copied, skipped (already exist), errors, rows/s, ETA.
Worker Pool Sizing: Why 200?
Production PostgreSQL: max_connections=500, PgBouncer in transaction mode. 200 workers = 200 concurrent connections. Each worker holds a connection for ~2-5 seconds (COPY + INSERT). At 200 workers and chunk_size=5000: throughput ~100K-200K rows/s, depending on network latency.
500 workers – oversaturation: PG starts throttling on lock contention (concurrent INSERT into the same index). 100 workers – network underutilization. 200 – empirical optimum for our EC2 t3.xlarge.
Data Quality
| Metric | Value | |——–|——-| | RTF conversion accuracy | 99.5% (manual validation, n=1,000) | | Coverage by year (2021-2026) | 94-97% | | Gaps | 3-6% – documents without RTF (metadata only) | | Duplicates | 0 (ON CONFLICT DO NOTHING) | | Encoding errors | <0.1% (surrogate replacement) |
3-6% gap – documents for which EDRSR does not publish full text (closed proceedings, decisions with restricted access under the Law on the Judiciary and Status of Judges).
Results
| Metric | Value | |——–|——-| | Full texts in production | ~60,000,000 | | Shards | 4 (one PG instance, EC2 t3.xlarge) | | Total size | 283 GB (EBS gp3) | | Indexes (B-tree PK) | ~2.8 GB per shard | | Backup S4 (8 GB) | ~2 min | | Backup S1 (146 GB) | ~90 min | | Download workers | 100 (asyncio) | | Conversion workers | 12 (multiprocessing) | | Production copy workers | 200 (subprocess) | | Pipeline idempotent | Yes (ON CONFLICT DO NOTHING + file-level resume) |
What Is Next
Full texts are raw material for two subsequent layers:
Vector embeddings. 60M × 1536 dim (text-embedding-ada-002) = ~350 GB in Qdrant. This requires a batch-embedding pipeline with rate limiting (OpenAI TPM), chunking of long texts, and an incremental update strategy.
Semantic sectioning. Splitting decisions into logical sections (reasoning, operative part, dissenting opinion) for more precise search. SemanticSectionizer already works for individual documents, but batch-processing 60M is a separate challenge.
Open data is not a compromise. It is an architectural decision. 60 million full texts, 283 GB across 4 shards, an idempotent pipeline with zero tolerance for data loss – all built on public sources, with no dependency on commercial APIs.