EDRSR: як ми імпортували мільйони судових рішень з держреєстру
60 мільйонів повних текстів. 283 ГБ на 4 шардах. Кастомний RTF-парсер з depth-tracking для Windows-1251 кирилиці. Двофазний ETL з idempotent upsert через temp-таблиці. Application-level sharding по doc_id з незалежними backup domains. PostgreSQL shared memory exhaustion і три рівні захисту. Все на відкритих даних ЄДРСР.
EDRSR: data pipeline для 60 мільйонів судових рішень
Архітектура ETL-системи, яка переносить весь Єдиний державний реєстр судових рішень у 4-шардову PostgreSQL-інфраструктуру -- від моделі даних і RTF-парсингу до capacity planning і операційних trade-offs.
Контекст задачі
LEX AI -- платформа семантичного пошуку по судовій практиці. Ядро пошуку -- векторні ембедінги (text-embedding-ada-002, 1536 dim), які генеруються з повних текстів рішень. Без тексту немає ембедінгів, без ембедінгів немає семантичного пошуку.
ЄДРСР (Єдиний державний реєстр судових рішень) -- це ~60M документів від 685 судів усіх інстанцій, з 2006 року по сьогодні. Повні тексти зберігаються у форматі RTF з кодуванням Windows-1251.
Масштаб задачі:
| Параметр | Значення |
|---|---|
| Документів у реєстрі | ~60,000,000 |
| Середній розмір RTF | ~4.5 КБ |
| Середній розмір plaintext | ~2.3 КБ |
| Сумарний обсяг тексту | 283 ГБ (PostgreSQL) |
| Судів-джерел | 685 |
| Часовий діапазон | 2006--2026 |
Принципове рішення: тільки відкриті дані
Ми свідомо обрали працювати виключно з відкритими джерелами. Портал reyestr.court.gov.ua публікує судові рішення у відкритому доступі -- це публічна інформація за Законом України «Про доступ до публічної інформації».
Причина не тільки етична. Комерційні API мають операційні ризики: rate limits, блокування токенів при bulk-завантаженні, залежність від третьої сторони. Конкретний інцидент: bulk-завантаження court_sessions (~35K запитів за 2.7 години) призвело до блокування обох API-токенів ZakonOnline, що вивело з ладу продакшн-чат.
| Джерело | Що отримуємо | Модель доступу |
|---|---|---|
| reyestr.court.gov.ua | Повні тексти у RTF | HTTP GET, rate-limited, безкоштовно |
| data.gov.ua | Метадані (CSV dumps) | Bulk download, оновлення щодня |
| Комерційні API | Те саме + JSON | REST API, платно, токени блокуються |
Модель даних
Перед тим як говорити про pipeline, варто зрозуміти цільову схему. Ми розділили метадані і повні тексти у дві окремі таблиці -- це ключове архітектурне рішення.
Метадані: edrsr_documents
CREATE TABLE edrsr_documents (
doc_id BIGINT PRIMARY KEY, -- PK з ЄДРСР, автоінкремент
court_code INTEGER, -- FK на edrsr_courts (без constraint)
judgment_code SMALLINT, -- тип рішення (вирок, ухвала, постанова)
justice_kind SMALLINT, -- вид судочинства
category_code INTEGER, -- категорія справи (4106 категорій)
cause_num TEXT, -- номер справи
adjudication_date TIMESTAMPTZ, -- дата винесення
receipt_date TIMESTAMPTZ, -- дата надходження до реєстру
judge TEXT, -- суддя/колегія
doc_url TEXT, -- URL на RTF у реєстрі
status SMALLINT DEFAULT 0,
date_publ TIMESTAMPTZ
);
Навмисна відсутність FK constraints. Джерельні дані з data.gov.ua містять court_code, justice_kind, category_code, які не завжди присутні в довідникових таблицях. З FK constraints імпорт ламається на кожному «брудному» рядку. Без них -- ми імпортуємо все, а валідацію робимо на рівні запитів.
Чому doc_id BIGINT, а не UUID? doc_id -- це натуральний ключ з ЄДРСР (автоінкремент). Він монотонно зростає, що дає ідеальний B-tree з мінімальною фрагментацією при послідовному імпорті. UUID дав би випадкові вставки по всьому індексу -- на 60M рядків це суттєва різниця в I/O.
8 індексів на типові паттерни запитів: court_code, justice_kind, judgment_code, category_code, cause_num, judge, adjudication_date, receipt_date. Кожен обґрунтований реальним use case (фільтрація по суду, по виду судочинства, пошук по номеру справи).
Повні тексти: edrsr_fulltext
CREATE TABLE edrsr_fulltext (
doc_id BIGINT PRIMARY KEY, -- join key до edrsr_documents
full_text TEXT, -- plaintext після RTF-конвертації
text_length INTEGER, -- pre-computed для фільтрації
created_at TIMESTAMP DEFAULT NOW()
);
Чому окрема таблиця, а не колонка в edrsr_documents? Три причини:
TOAST-сегментація. PostgreSQL зберігає TEXT > 2 КБ в окремих TOAST-сторінках. Якщо full_text лежить у тій же таблиці, що й метадані, то
SELECT court_code, cause_num FROM edrsr_documentsвсе одно торкатиметься TOAST-сторінок при sequential scan. Окрема таблиця = чистий sequential scan по метаданих без overhead.Різні lifecycle. Метадані імпортуються з CSV-дампів data.gov.ua (щоденне оновлення). Повні тексти завантажуються з reyestr.court.gov.ua (одноразовий bulk + incremental). Різні джерела, різні скрипти, різна частота.
Незалежний шардинг. Повні тексти займають 283 ГБ проти ~12 ГБ метаданих. Шардити потрібно тільки тексти, метадані лишаються в одній базі.
Довідники
5 довідникових таблиць: courts (685), instances (3), regions (27), justice_kinds (5), judgment_forms (10+), cause_categories (4106). Імпортуються один раз, оновлюються рідко.
Архітектура pipeline
Pipeline реалізований як 4 незалежні Python-скрипти. Кожен idempotent -- можна перезапускати без втрати даних і дублікатів.
┌─────────────────────┐ ┌──────────────────────┐ ┌──────────────────┐ ┌──────────────────────┐
│ 1. Download RTF │ │ 2. Import from HDD │ │ 3. Monitor │ │ 4. Copy to Prod │
│ │ │ │ │ │ │ │
│ asyncio + aiohttp │───▶│ multiprocessing │───▶│ PG aggregate │───▶│ 2-phase ETL │
│ 100 workers │ │ 12 CPU workers │ │ + in-mem cache │ │ 200 psql workers │
│ 3 retries + backoff│ │ COPY FROM STDIN │ │ cross-env stats │ │ TSV chunks on NVMe │
│ │ │ ON CONFLICT NOTHING │ │ │ │ ON CONFLICT NOTHING │
│ reyestr.court.gov │ │ HDD → PG local │ │ local/stage/prod│ │ PG local → PG prod │
│ → /tmp/edrsr-rtf/ │ │ 18 TB /dev/sda1 │ │ │ │ per-shard routing │
└─────────────────────┘ └──────────────────────┘ └──────────────────┘ └──────────────────────┘
Етап 1: Завантаження RTF
I/O-модель: async HTTP GET → disk write. Network-bound задача, тому asyncio + aiohttp з TCPConnector(limit=100, limit_per_host=100).
semaphore = asyncio.Semaphore(100) # 100 concurrent downloads
# Retry: 3 attempts, exponential backoff (2s, 4s, 6s)
# 429 handling: sleep 5 * (attempt + 1) seconds
Resumability. Перед завантаженням перевіряємо outpath.exists() and outpath.stat().st_size > 0. Якщо файл вже є і не порожній -- пропускаємо. Це дозволяє перезапускати скрипт без повторного завантаження.
Файлова конвенція: {doc_id}.rtf -- doc_id є ім'ям файлу. Це дає O(1) lookup без бази метаданих: int(filename[:-4]) → doc_id.
RTF-парсер: чому кастомний
RTF з ЄДРСР -- не звичайний RTF. Це Windows-1251 кирилиця, закодована як \\'XX escape-послідовності всередині latin1-обгортки. Стандартні бібліотеки (striprtf, pyrtf-ng) не розрізняють Windows-1251 та latin1 байти і ламають кирилицю.
Наш парсер працює в 7 кроків:
1. raw bytes → latin1 decode (RTF envelope)
2. Remove nested groups: {\fonttbl ...}, {\colortbl ...},
{\stylesheet ...}, {\info ...}, {\*\ ...}
(depth-tracking brace parser, O(n))
3. Strip \rtf1 header
4. \par → \n, \line → \n, \tab → \t
5. \\'XX → Windows-1251 byte decode
6. \uNNNNN → chr(code), range check 0..0x10FFFF
7. Strip remaining \keyword sequences
8. Remove braces, null bytes, normalize newlines
9. UTF-8 surrogate cleanup: encode('utf-8', errors='surrogatepass')
.decode('utf-8', errors='replace')
Depth-tracking для вкладених груп. RTF-група {\fonttbl {\f0 Times;}} може мати довільну глибину вкладеності. Парсер відстежує баланс {} і видаляє всю групу від відкриваючої до закриваючої дужки на тому ж рівні. Складність O(n) по довжині документа.
Точність: 99.5% на корпусі ~1000 вручну перевірених документів. 0.5% помилок -- документи з нестандартними RTF-розширеннями (вбудовані зображення, OLE-об'єкти), де текст все одно витягується, але з артефактами.
Етап 2: Масовий імпорт з HDD
Це головний робочий кінь pipeline. Усі RTF-файли лежать на 18 ТБ HDD (/dev/sda1), і скрипт повинен конвертувати їх у текст та завантажити в PostgreSQL.
Чому multiprocessing, а не asyncio? RTF-конвертація -- CPU-bound: 7 regex замін, ітерація по символах для depth-tracking, encode/decode. Python GIL блокує паралельне виконання CPU-bound коду в тредах. multiprocessing.Pool з 12 воркерами (= кількість ядер) обходить GIL через окремі процеси.
Pool(processes=12, initializer=_init_worker, initargs=(rtf_lookup,))
pool.map(convert_one, batch_ids, chunksize=50)
chunksize=50: балансує між overhead на IPC (передача задач між процесами) і granularity. При chunksize=1 IPC overhead домінує. При chunksize=1000 один повільний файл блокує весь чанк.
I/O-паттерн: scandir замість stat
На HDD з 15M+ файлів os.stat() -- bottleneck. Кожен stat() -- окремий I/O seek на шпиндельному диску. При 15M файлів це ~4 години тільки на stat().
# Один прохід scandir -- побудова lookup O(n)
rtf_lookup: dict[int, Path] = {}
for entry in os.scandir(rtf_dir): # readdir, без stat()
if entry.name.endswith('.rtf'):
doc_id = int(entry.name[:-4])
rtf_lookup[doc_id] = rtf_dir / entry.name
os.scandir() викликає readdir() системного рівня, який повертає імена файлів без stat(). Це один sequential read директорії замість 15M random seeks.
Idempotent upsert через temp-таблицю
Критичний патерн для будь-якого data pipeline на великих обсягах:
CREATE TEMP TABLE _ft_tmp (doc_id bigint, full_text text);
COPY _ft_tmp FROM stdin; -- bulk load у тимчасову
INSERT INTO edrsr_fulltext(doc_id, full_text)
SELECT doc_id, full_text FROM _ft_tmp
ON CONFLICT (doc_id) DO NOTHING; -- idempotent: дублікати ігноруються
DROP TABLE _ft_tmp;
Чому не прямий COPY INTO edrsr_fulltext? COPY не підтримує ON CONFLICT. Якщо в batch є doc_id, який вже існує, весь COPY падає. Temp-таблиця + INSERT ON CONFLICT -- це staging area з дедуплікацією.
Чому не INSERT ... ON CONFLICT DO UPDATE? DO NOTHING дешевше: не генерує WAL для незмінених рядків. Тексти не змінюються після першого імпорту, тому UPDATE не потрібен.
Перевірка вже імпортованих
Перед конвертацією скрипт вивантажує existing doc_id:
SELECT doc_id FROM edrsr_fulltext WHERE doc_id BETWEEN {min_id} AND {max_id};
to_import = sorted(set(rtf_lookup.keys()) - existing)
Це set difference на рівні Python -- O(n). Для 30M doc_id це ~2 ГБ пам'яті (64 байти на int у set), що прийнятно.
Етап 3: Моніторинг і PostgreSQL shared memory
Коли імпортуєш мільйони записів, потрібна observability. Ми побудували адмін-сторінку з cross-environment агрегацією:
- KPI-картки: total metadata, total fulltext, coverage %
- Таблиця по роках з progress bars
- Дані з local, stage, prod (через
/api/internal/edrsr-stats) - Auto-refresh кожні 30 секунд
Інцидент: PG error 53100
could not resize shared memory segment -- No space left on device
Root cause. Запит LEFT JOIN edrsr_documents (45M) x edrsr_fulltext з GROUP BY EXTRACT(YEAR FROM adjudication_date) потребував hash join. PostgreSQL алокує hash table у shared memory. З work_mem=256MB одна така операція з'їдала весь shm_size контейнера (Docker default: 64 МБ).
Auto-refresh frontend кожні 30с = ~120 таких запитів/год. Кожен -- потенційний OOM на shared memory.
Три рівні захисту:
1. Query decomposition. Замість одного JOIN -- два окремі COUNT:
-- Query 1: metadata counts
SELECT EXTRACT(YEAR FROM adjudication_date)::int AS year,
COUNT(*)::int AS total FROM edrsr_documents GROUP BY year;
-- Query 2: fulltext counts
SELECT EXTRACT(YEAR FROM d.adjudication_date)::int AS year,
COUNT(f.doc_id)::int AS with_fulltext
FROM edrsr_documents d
LEFT JOIN edrsr_fulltext f ON f.doc_id = d.doc_id GROUP BY year;
Merge відбувається в Node.js. Кожен запит працює з меншим hash table.
2. work_mem throttling. SET LOCAL work_mem='32MB' в транзакції. 32 МБ замість 256 МБ -- 8x менше тиску на shared memory. SET LOCAL скидається після транзакції, не впливає на інші з'єднання.
3. In-memory cache (TTL 5 хв). Node.js Map з timestamp. Ідентичні відповіді віддаються з кешу. 120 запитів/год → 12 запитів/год.
Safety net: shm_size: 2g в Docker Compose. Не фікс, а страховка.
Архітектура шардингу: 4 бази на одному PostgreSQL
Capacity planning
60M рядків × ~4.7 КБ середній розмір (текст + overhead) = ~283 ГБ
EC2 t3.xlarge: 4 vCPU, 16 ГБ RAM, EBS gp3
shared_buffers = 4 ГБ (25% RAM)
effective_cache_size = 12 ГБ
283 ГБ даних при 4 ГБ shared_buffers означає buffer hit ratio ~1.4%. Для sequential scan (VACUUM, ANALYZE) це прийнятно. Для point lookups по doc_id (PK) -- B-tree індекс ~2.8 ГБ поміщається в shared_buffers.
Проблема single-database: pg_dump 283 ГБ -- це ~4 години. Якщо впаде на 90% -- починаєте спочатку. VACUUM FULL на таблиці 283 ГБ -- потрібен подвійний дисковий простір (566 ГБ). autovacuum на 60M рядків з великим dead tuple ratio може працювати годинами.
Стратегія шардингу
Application-level sharding по doc_id ranges. 4 окремі бази в одному PostgreSQL-контейнері:
| Шард | База | Діапазон doc_id | Рядків | Розмір | Backup time |
|---|---|---|---|---|---|
| S1 | secondlayer_prod |
< 112M | ~24M | 146 ГБ | ~90 хв |
| S2 | secondlayer_prod_ft2 |
112M--150M | ~26M | 101 ГБ | ~60 хв |
| S3 | secondlayer_prod_ft3 |
150M--175M | ~8M | 27 ГБ | ~15 хв |
| S4 | secondlayer_prod_ft4 |
> 175M | ~2M | 8 ГБ | ~2 хв |
Чому не нативний partitioning? Declarative range partitions вирішили б проблему VACUUM (кожна partition -- окрема heap), але NOT pg_dump: всі партиції живуть в одній базі, і дамп/рестор працює на рівні бази цілком. З окремими базами -- 4 незалежні pg_dump | pg_restore паралельно.
Чому не Citus? Citus потребує coordinator + workers (мінімум 2 ноди) або managed-сервіс. Наш access pattern -- point lookups по doc_id -- не потребує distributed query planning. Також Citus не дає незалежних backup domains.
Чому не FDW (Foreign Data Wrappers)? Розглядали postgres_fdw для прозорого cross-shard query. Відкинули: fdw додає latency (~2ms overhead на запит), не підтримує pushdown для всіх операцій, і ускладнює backup (fdw-таблиці не дампляться стандартним pg_dump).
Маршрутизація запитів
Ключ шардингу -- doc_id (BIGINT). Монотонно зростає, тому range sharding природний:
doc_id < 112,000,000 → secondlayer_prod (S1)
112M ≤ doc_id < 150,000,000 → secondlayer_prod_ft2 (S2)
150M ≤ doc_id < 175,000,000 → secondlayer_prod_ft3 (S3)
doc_id ≥ 175,000,000 → secondlayer_prod_ft4 (S4)
Backend маршрутизує на рівні connection pool: 4 пули PgBouncer, кожен на свою базу. Для нового шарду -- додати базу, пул, і оновити range map.
Моніторинг: endpoint /api/internal/edrsr-stats збирає count з усіх шардів через pg_class.reltuples (approximate count, O(1)) замість COUNT(*) (sequential scan, O(n)).
Trade-offs
| Аспект | Плюс | Мінус |
|---|---|---|
| Backup | Незалежний per-shard (ft4 = 2 хв) | 4 окремі cron jobs |
| VACUUM | Паралельний, менші таблиці | 4 autovacuum workers |
| Queries | Point lookup O(log n) | Cross-shard JOIN тільки в Node.js |
| Connections | Ізольовані пули | 4× connection overhead в PgBouncer |
| Ops | Можна дропнути/перебудувати один шард | Ручний range management |
Копіювання на продакшн: двофазний ETL
Перенести 60M рядків (283 ГБ) з локального PG на 4 шарди продакшну через мережу -- окрема інженерна задача. Скрипт copy-fulltext-to-prod.py реалізує двофазний підхід.
Фаза 1: Export (sequential read → TSV chunks)
# Один streaming COPY з local PG → TSV-файли на NVMe
export_sql = "\\COPY (SELECT doc_id, full_text FROM edrsr_fulltext "
f"WHERE {where} ORDER BY doc_id) TO STDOUT WITH (FORMAT text)"
proc = subprocess.Popen(LOCAL_CMD + ["-c", export_sql], stdout=PIPE)
for line in proc.stdout: # streaming, без накопичення в пам'яті
current_file.write(line)
if line_count >= chunk_size: # default 5000 рядків
rotate_to_next_chunk()
Чому TSV, а не CSV? COPY text format (TSV) -- native PostgreSQL формат. Не потрібен CSV parsing на стороні прийому. Escaping простіший: tab-separated, backslash-escaping.
Чому chunk files, а не pipe? Resumability. Якщо мережа впаде на 70% uploadu -- restart підбирає невідправлені чанки. Кожен чанк = atomic unit of work.
I/O pattern: Sequential read з local PG (NVMe) → sequential write в /tmp/edrsr-ft-chunks/. Один потік, без конкуренції за диск.
Фаза 2: Upload (parallel workers → prod PG)
Pool(processes=200) # 200 паралельних psql-процесів
pool.imap_unordered(upload_chunk, chunk_files, chunksize=1)
Кожен воркер:
- Читає TSV-чанк з диска (~5000 рядків, ~25 МБ)
- Формує SQL:
CREATE TEMP TABLE→COPY FROM STDIN→INSERT ON CONFLICT→DROP TABLE - Виконує через
subprocess.run(["psql", "-h", prod_host, ...]) - Парсить stdout на
INSERT 0 Nдля підрахунку скопійованих - Видаляє чанк-файл після успіху
Чому psql subprocess, а не psycopg2? Python GIL. 200 тредів з psycopg2 серіалізуються на GIL при обробці мережевих буферів. 200 subprocess -- це 200 окремих процесів, кожен з власним TCP-з'єднанням. Повна утилізація мережевої пропускної здатності.
SET lock_timeout = '5min' на кожному чанку -- захист від deadlock при конкурентних INSERT в один шард.
Resumability: Чанки видаляються тільки після успішного INSERT. --skip-export дозволяє перезапустити тільки фазу upload з наявних чанків. --resume-from-doc-id дозволяє доекспортувати нові дані до існуючих чанків.
Прогрес: кожні 200 чанків: copied, skipped (already exist), errors, rows/s, ETA.
Розмір воркер-пулу: чому 200?
Продакшн PostgreSQL: max_connections=500, PgBouncer у transaction mode. 200 воркерів = 200 concurrent connections. Кожен воркер тримає з'єднання ~2-5 секунд (COPY + INSERT). При 200 workers і chunk_size=5000: throughput ~100K-200K rows/s, залежно від мережевої латентності.
500 воркерів -- oversaturation: PG починає тротлити на lock contention (concurrent INSERT в той самий індекс). 100 воркерів -- недовантаження мережі. 200 -- емпіричний оптимум для нашого EC2 t3.xlarge.
Data quality
| Метрика | Значення |
|---|---|
| RTF-конвертація: точність | 99.5% (manual validation, n=1000) |
| Покриття по роках (2021-2026) | 94-97% |
| Gaps | 3-6% -- документи без RTF (тільки метадані) |
| Дублікати | 0 (ON CONFLICT DO NOTHING) |
| Encoding errors | <0.1% (surrogate replacement) |
3-6% gap -- це документи, для яких ЄДРСР не публікує повний текст (закриті провадження, рішення з обмеженим доступом за ЗУ «Про судоустрій та статус суддів»).
Результати
| Метрика | Значення |
|---|---|
| Повних текстів на проді | ~60,000,000 |
| Шардів | 4 (одна PG інстанція, EC2 t3.xlarge) |
| Загальний розмір | 283 ГБ (EBS gp3) |
| Індекси (B-tree PK) | ~2.8 ГБ per shard |
| Backup S4 (8 ГБ) | ~2 хв |
| Backup S1 (146 ГБ) | ~90 хв |
| Воркерів завантаження | 100 (asyncio) |
| Воркерів конвертації | 12 (multiprocessing) |
| Воркерів продакшн-копії | 200 (subprocess) |
| Pipeline idempotent | Так (ON CONFLICT DO NOTHING + file-level resume) |
Що далі
Повні тексти -- це сировина для двох наступних шарів:
Векторні ембедінги. 60M × 1536 dim (text-embedding-ada-002) = ~350 ГБ у Qdrant. Це потребує batch-embedding pipeline з rate limiting (OpenAI TPM), chunking довгих текстів, та incremental update strategy.
Semantic sectioning. Розбиття рішень на логічні секції (мотивувальна частина, резолютивна частина, окрема думка) для точнішого пошуку. SemanticSectionizer вже працює для окремих документів, але batch-обробка 60M -- окремий виклик.
Відкриті дані -- це не компроміс. Це архітектурне рішення. 60 мільйонів повних текстів, 283 ГБ на 4 шардах, idempotent pipeline з нульовою толерантністю до втрати даних -- і все побудовано на публічних джерелах, без залежності від комерційних API.