LEX — AI Legal Platform for Law Firms

AI-powered legal analysis platform for law firms and corporate counsel.

Features

Resources

Blog Articles

Technology

Built on AWS (EC2, Bedrock Claude AI, ALB, WAF, S3, ACM, KMS). PostgreSQL, Redis, Qdrant vector database. TypeScript, React, Node.js.

Start free — 50 credits on registration. Sign up

TECH 15 хв

EDRSR: як ми імпортували мільйони судових рішень з держреєстру

60 мільйонів повних текстів. 283 ГБ на 4 шардах. Кастомний RTF-парсер з depth-tracking для Windows-1251 кирилиці. Двофазний ETL з idempotent upsert через temp-таблиці. Application-level sharding по doc_id з незалежними backup domains. PostgreSQL shared memory exhaustion і три рівні захисту. Все на відкритих даних ЄДРСР.

EDRSR: data pipeline для 60 мільйонів судових рішень

Архітектура ETL-системи, яка переносить весь Єдиний державний реєстр судових рішень у 4-шардову PostgreSQL-інфраструктуру -- від моделі даних і RTF-парсингу до capacity planning і операційних trade-offs.


Контекст задачі

LEX AI -- платформа семантичного пошуку по судовій практиці. Ядро пошуку -- векторні ембедінги (text-embedding-ada-002, 1536 dim), які генеруються з повних текстів рішень. Без тексту немає ембедінгів, без ембедінгів немає семантичного пошуку.

ЄДРСР (Єдиний державний реєстр судових рішень) -- це ~60M документів від 685 судів усіх інстанцій, з 2006 року по сьогодні. Повні тексти зберігаються у форматі RTF з кодуванням Windows-1251.

Масштаб задачі:

Параметр Значення
Документів у реєстрі ~60,000,000
Середній розмір RTF ~4.5 КБ
Середній розмір plaintext ~2.3 КБ
Сумарний обсяг тексту 283 ГБ (PostgreSQL)
Судів-джерел 685
Часовий діапазон 2006--2026

Принципове рішення: тільки відкриті дані

Ми свідомо обрали працювати виключно з відкритими джерелами. Портал reyestr.court.gov.ua публікує судові рішення у відкритому доступі -- це публічна інформація за Законом України «Про доступ до публічної інформації».

Причина не тільки етична. Комерційні API мають операційні ризики: rate limits, блокування токенів при bulk-завантаженні, залежність від третьої сторони. Конкретний інцидент: bulk-завантаження court_sessions (~35K запитів за 2.7 години) призвело до блокування обох API-токенів ZakonOnline, що вивело з ладу продакшн-чат.

Джерело Що отримуємо Модель доступу
reyestr.court.gov.ua Повні тексти у RTF HTTP GET, rate-limited, безкоштовно
data.gov.ua Метадані (CSV dumps) Bulk download, оновлення щодня
Комерційні API Те саме + JSON REST API, платно, токени блокуються

Модель даних

Перед тим як говорити про pipeline, варто зрозуміти цільову схему. Ми розділили метадані і повні тексти у дві окремі таблиці -- це ключове архітектурне рішення.

Метадані: edrsr_documents

CREATE TABLE edrsr_documents (
  doc_id       BIGINT PRIMARY KEY,   -- PK з ЄДРСР, автоінкремент
  court_code   INTEGER,              -- FK на edrsr_courts (без constraint)
  judgment_code SMALLINT,            -- тип рішення (вирок, ухвала, постанова)
  justice_kind SMALLINT,             -- вид судочинства
  category_code INTEGER,             -- категорія справи (4106 категорій)
  cause_num    TEXT,                  -- номер справи
  adjudication_date TIMESTAMPTZ,     -- дата винесення
  receipt_date TIMESTAMPTZ,          -- дата надходження до реєстру
  judge        TEXT,                  -- суддя/колегія
  doc_url      TEXT,                  -- URL на RTF у реєстрі
  status       SMALLINT DEFAULT 0,
  date_publ    TIMESTAMPTZ
);

Навмисна відсутність FK constraints. Джерельні дані з data.gov.ua містять court_code, justice_kind, category_code, які не завжди присутні в довідникових таблицях. З FK constraints імпорт ламається на кожному «брудному» рядку. Без них -- ми імпортуємо все, а валідацію робимо на рівні запитів.

Чому doc_id BIGINT, а не UUID? doc_id -- це натуральний ключ з ЄДРСР (автоінкремент). Він монотонно зростає, що дає ідеальний B-tree з мінімальною фрагментацією при послідовному імпорті. UUID дав би випадкові вставки по всьому індексу -- на 60M рядків це суттєва різниця в I/O.

8 індексів на типові паттерни запитів: court_code, justice_kind, judgment_code, category_code, cause_num, judge, adjudication_date, receipt_date. Кожен обґрунтований реальним use case (фільтрація по суду, по виду судочинства, пошук по номеру справи).

Повні тексти: edrsr_fulltext

CREATE TABLE edrsr_fulltext (
  doc_id      BIGINT PRIMARY KEY,  -- join key до edrsr_documents
  full_text   TEXT,                -- plaintext після RTF-конвертації
  text_length INTEGER,             -- pre-computed для фільтрації
  created_at  TIMESTAMP DEFAULT NOW()
);

Чому окрема таблиця, а не колонка в edrsr_documents? Три причини:

  1. TOAST-сегментація. PostgreSQL зберігає TEXT > 2 КБ в окремих TOAST-сторінках. Якщо full_text лежить у тій же таблиці, що й метадані, то SELECT court_code, cause_num FROM edrsr_documents все одно торкатиметься TOAST-сторінок при sequential scan. Окрема таблиця = чистий sequential scan по метаданих без overhead.

  2. Різні lifecycle. Метадані імпортуються з CSV-дампів data.gov.ua (щоденне оновлення). Повні тексти завантажуються з reyestr.court.gov.ua (одноразовий bulk + incremental). Різні джерела, різні скрипти, різна частота.

  3. Незалежний шардинг. Повні тексти займають 283 ГБ проти ~12 ГБ метаданих. Шардити потрібно тільки тексти, метадані лишаються в одній базі.

Довідники

5 довідникових таблиць: courts (685), instances (3), regions (27), justice_kinds (5), judgment_forms (10+), cause_categories (4106). Імпортуються один раз, оновлюються рідко.

Архітектура pipeline

Pipeline реалізований як 4 незалежні Python-скрипти. Кожен idempotent -- можна перезапускати без втрати даних і дублікатів.

┌─────────────────────┐    ┌──────────────────────┐    ┌──────────────────┐    ┌──────────────────────┐
│  1. Download RTF    │    │  2. Import from HDD  │    │  3. Monitor      │    │  4. Copy to Prod     │
│                     │    │                      │    │                  │    │                      │
│  asyncio + aiohttp  │───▶│  multiprocessing     │───▶│  PG aggregate    │───▶│  2-phase ETL         │
│  100 workers        │    │  12 CPU workers      │    │  + in-mem cache  │    │  200 psql workers    │
│  3 retries + backoff│    │  COPY FROM STDIN     │    │  cross-env stats │    │  TSV chunks on NVMe  │
│                     │    │  ON CONFLICT NOTHING  │    │                  │    │  ON CONFLICT NOTHING  │
│  reyestr.court.gov  │    │  HDD → PG local      │    │  local/stage/prod│    │  PG local → PG prod  │
│  → /tmp/edrsr-rtf/  │    │  18 TB /dev/sda1     │    │                  │    │  per-shard routing   │
└─────────────────────┘    └──────────────────────┘    └──────────────────┘    └──────────────────────┘

Етап 1: Завантаження RTF

I/O-модель: async HTTP GET → disk write. Network-bound задача, тому asyncio + aiohttp з TCPConnector(limit=100, limit_per_host=100).

semaphore = asyncio.Semaphore(100)  # 100 concurrent downloads
# Retry: 3 attempts, exponential backoff (2s, 4s, 6s)
# 429 handling: sleep 5 * (attempt + 1) seconds

Resumability. Перед завантаженням перевіряємо outpath.exists() and outpath.stat().st_size > 0. Якщо файл вже є і не порожній -- пропускаємо. Це дозволяє перезапускати скрипт без повторного завантаження.

Файлова конвенція: {doc_id}.rtf -- doc_id є ім'ям файлу. Це дає O(1) lookup без бази метаданих: int(filename[:-4]) → doc_id.

RTF-парсер: чому кастомний

RTF з ЄДРСР -- не звичайний RTF. Це Windows-1251 кирилиця, закодована як \\'XX escape-послідовності всередині latin1-обгортки. Стандартні бібліотеки (striprtf, pyrtf-ng) не розрізняють Windows-1251 та latin1 байти і ламають кирилицю.

Наш парсер працює в 7 кроків:

1. raw bytes → latin1 decode (RTF envelope)
2. Remove nested groups: {\fonttbl ...}, {\colortbl ...},
   {\stylesheet ...}, {\info ...}, {\*\ ...}
   (depth-tracking brace parser, O(n))
3. Strip \rtf1 header
4. \par → \n, \line → \n, \tab → \t
5. \\'XX → Windows-1251 byte decode
6. \uNNNNN → chr(code), range check 0..0x10FFFF
7. Strip remaining \keyword sequences
8. Remove braces, null bytes, normalize newlines
9. UTF-8 surrogate cleanup: encode('utf-8', errors='surrogatepass')
                            .decode('utf-8', errors='replace')

Depth-tracking для вкладених груп. RTF-група {\fonttbl {\f0 Times;}} може мати довільну глибину вкладеності. Парсер відстежує баланс {} і видаляє всю групу від відкриваючої до закриваючої дужки на тому ж рівні. Складність O(n) по довжині документа.

Точність: 99.5% на корпусі ~1000 вручну перевірених документів. 0.5% помилок -- документи з нестандартними RTF-розширеннями (вбудовані зображення, OLE-об'єкти), де текст все одно витягується, але з артефактами.

Етап 2: Масовий імпорт з HDD

Це головний робочий кінь pipeline. Усі RTF-файли лежать на 18 ТБ HDD (/dev/sda1), і скрипт повинен конвертувати їх у текст та завантажити в PostgreSQL.

Чому multiprocessing, а не asyncio? RTF-конвертація -- CPU-bound: 7 regex замін, ітерація по символах для depth-tracking, encode/decode. Python GIL блокує паралельне виконання CPU-bound коду в тредах. multiprocessing.Pool з 12 воркерами (= кількість ядер) обходить GIL через окремі процеси.

Pool(processes=12, initializer=_init_worker, initargs=(rtf_lookup,))
pool.map(convert_one, batch_ids, chunksize=50)

chunksize=50: балансує між overhead на IPC (передача задач між процесами) і granularity. При chunksize=1 IPC overhead домінує. При chunksize=1000 один повільний файл блокує весь чанк.

I/O-паттерн: scandir замість stat

На HDD з 15M+ файлів os.stat() -- bottleneck. Кожен stat() -- окремий I/O seek на шпиндельному диску. При 15M файлів це ~4 години тільки на stat().

# Один прохід scandir -- побудова lookup O(n)
rtf_lookup: dict[int, Path] = {}
for entry in os.scandir(rtf_dir):   # readdir, без stat()
    if entry.name.endswith('.rtf'):
        doc_id = int(entry.name[:-4])
        rtf_lookup[doc_id] = rtf_dir / entry.name

os.scandir() викликає readdir() системного рівня, який повертає імена файлів без stat(). Це один sequential read директорії замість 15M random seeks.

Idempotent upsert через temp-таблицю

Критичний патерн для будь-якого data pipeline на великих обсягах:

CREATE TEMP TABLE _ft_tmp (doc_id bigint, full_text text);
COPY _ft_tmp FROM stdin;            -- bulk load у тимчасову
INSERT INTO edrsr_fulltext(doc_id, full_text)
SELECT doc_id, full_text FROM _ft_tmp
ON CONFLICT (doc_id) DO NOTHING;    -- idempotent: дублікати ігноруються
DROP TABLE _ft_tmp;

Чому не прямий COPY INTO edrsr_fulltext? COPY не підтримує ON CONFLICT. Якщо в batch є doc_id, який вже існує, весь COPY падає. Temp-таблиця + INSERT ON CONFLICT -- це staging area з дедуплікацією.

Чому не INSERT ... ON CONFLICT DO UPDATE? DO NOTHING дешевше: не генерує WAL для незмінених рядків. Тексти не змінюються після першого імпорту, тому UPDATE не потрібен.

Перевірка вже імпортованих

Перед конвертацією скрипт вивантажує existing doc_id:

SELECT doc_id FROM edrsr_fulltext WHERE doc_id BETWEEN {min_id} AND {max_id};
to_import = sorted(set(rtf_lookup.keys()) - existing)

Це set difference на рівні Python -- O(n). Для 30M doc_id це ~2 ГБ пам'яті (64 байти на int у set), що прийнятно.

Етап 3: Моніторинг і PostgreSQL shared memory

Коли імпортуєш мільйони записів, потрібна observability. Ми побудували адмін-сторінку з cross-environment агрегацією:

Інцидент: PG error 53100

could not resize shared memory segment -- No space left on device

Root cause. Запит LEFT JOIN edrsr_documents (45M) x edrsr_fulltext з GROUP BY EXTRACT(YEAR FROM adjudication_date) потребував hash join. PostgreSQL алокує hash table у shared memory. З work_mem=256MB одна така операція з'їдала весь shm_size контейнера (Docker default: 64 МБ).

Auto-refresh frontend кожні 30с = ~120 таких запитів/год. Кожен -- потенційний OOM на shared memory.

Три рівні захисту:

1. Query decomposition. Замість одного JOIN -- два окремі COUNT:

-- Query 1: metadata counts
SELECT EXTRACT(YEAR FROM adjudication_date)::int AS year,
       COUNT(*)::int AS total FROM edrsr_documents GROUP BY year;

-- Query 2: fulltext counts
SELECT EXTRACT(YEAR FROM d.adjudication_date)::int AS year,
       COUNT(f.doc_id)::int AS with_fulltext
FROM edrsr_documents d
LEFT JOIN edrsr_fulltext f ON f.doc_id = d.doc_id GROUP BY year;

Merge відбувається в Node.js. Кожен запит працює з меншим hash table.

2. work_mem throttling. SET LOCAL work_mem='32MB' в транзакції. 32 МБ замість 256 МБ -- 8x менше тиску на shared memory. SET LOCAL скидається після транзакції, не впливає на інші з'єднання.

3. In-memory cache (TTL 5 хв). Node.js Map з timestamp. Ідентичні відповіді віддаються з кешу. 120 запитів/год → 12 запитів/год.

Safety net: shm_size: 2g в Docker Compose. Не фікс, а страховка.

Архітектура шардингу: 4 бази на одному PostgreSQL

Capacity planning

60M рядків × ~4.7 КБ середній розмір (текст + overhead) = ~283 ГБ
EC2 t3.xlarge: 4 vCPU, 16 ГБ RAM, EBS gp3
shared_buffers = 4 ГБ (25% RAM)
effective_cache_size = 12 ГБ

283 ГБ даних при 4 ГБ shared_buffers означає buffer hit ratio ~1.4%. Для sequential scan (VACUUM, ANALYZE) це прийнятно. Для point lookups по doc_id (PK) -- B-tree індекс ~2.8 ГБ поміщається в shared_buffers.

Проблема single-database: pg_dump 283 ГБ -- це ~4 години. Якщо впаде на 90% -- починаєте спочатку. VACUUM FULL на таблиці 283 ГБ -- потрібен подвійний дисковий простір (566 ГБ). autovacuum на 60M рядків з великим dead tuple ratio може працювати годинами.

Стратегія шардингу

Application-level sharding по doc_id ranges. 4 окремі бази в одному PostgreSQL-контейнері:

Шард База Діапазон doc_id Рядків Розмір Backup time
S1 secondlayer_prod < 112M ~24M 146 ГБ ~90 хв
S2 secondlayer_prod_ft2 112M--150M ~26M 101 ГБ ~60 хв
S3 secondlayer_prod_ft3 150M--175M ~8M 27 ГБ ~15 хв
S4 secondlayer_prod_ft4 > 175M ~2M 8 ГБ ~2 хв

Чому не нативний partitioning? Declarative range partitions вирішили б проблему VACUUM (кожна partition -- окрема heap), але NOT pg_dump: всі партиції живуть в одній базі, і дамп/рестор працює на рівні бази цілком. З окремими базами -- 4 незалежні pg_dump | pg_restore паралельно.

Чому не Citus? Citus потребує coordinator + workers (мінімум 2 ноди) або managed-сервіс. Наш access pattern -- point lookups по doc_id -- не потребує distributed query planning. Також Citus не дає незалежних backup domains.

Чому не FDW (Foreign Data Wrappers)? Розглядали postgres_fdw для прозорого cross-shard query. Відкинули: fdw додає latency (~2ms overhead на запит), не підтримує pushdown для всіх операцій, і ускладнює backup (fdw-таблиці не дампляться стандартним pg_dump).

Маршрутизація запитів

Ключ шардингу -- doc_id (BIGINT). Монотонно зростає, тому range sharding природний:

doc_id < 112,000,000        → secondlayer_prod      (S1)
112M ≤ doc_id < 150,000,000 → secondlayer_prod_ft2  (S2)
150M ≤ doc_id < 175,000,000 → secondlayer_prod_ft3  (S3)
doc_id ≥ 175,000,000        → secondlayer_prod_ft4  (S4)

Backend маршрутизує на рівні connection pool: 4 пули PgBouncer, кожен на свою базу. Для нового шарду -- додати базу, пул, і оновити range map.

Моніторинг: endpoint /api/internal/edrsr-stats збирає count з усіх шардів через pg_class.reltuples (approximate count, O(1)) замість COUNT(*) (sequential scan, O(n)).

Trade-offs

Аспект Плюс Мінус
Backup Незалежний per-shard (ft4 = 2 хв) 4 окремі cron jobs
VACUUM Паралельний, менші таблиці 4 autovacuum workers
Queries Point lookup O(log n) Cross-shard JOIN тільки в Node.js
Connections Ізольовані пули 4× connection overhead в PgBouncer
Ops Можна дропнути/перебудувати один шард Ручний range management

Копіювання на продакшн: двофазний ETL

Перенести 60M рядків (283 ГБ) з локального PG на 4 шарди продакшну через мережу -- окрема інженерна задача. Скрипт copy-fulltext-to-prod.py реалізує двофазний підхід.

Фаза 1: Export (sequential read → TSV chunks)

# Один streaming COPY з local PG → TSV-файли на NVMe
export_sql = "\\COPY (SELECT doc_id, full_text FROM edrsr_fulltext "
             f"WHERE {where} ORDER BY doc_id) TO STDOUT WITH (FORMAT text)"

proc = subprocess.Popen(LOCAL_CMD + ["-c", export_sql], stdout=PIPE)
for line in proc.stdout:  # streaming, без накопичення в пам'яті
    current_file.write(line)
    if line_count >= chunk_size:  # default 5000 рядків
        rotate_to_next_chunk()

Чому TSV, а не CSV? COPY text format (TSV) -- native PostgreSQL формат. Не потрібен CSV parsing на стороні прийому. Escaping простіший: tab-separated, backslash-escaping.

Чому chunk files, а не pipe? Resumability. Якщо мережа впаде на 70% uploadu -- restart підбирає невідправлені чанки. Кожен чанк = atomic unit of work.

I/O pattern: Sequential read з local PG (NVMe) → sequential write в /tmp/edrsr-ft-chunks/. Один потік, без конкуренції за диск.

Фаза 2: Upload (parallel workers → prod PG)

Pool(processes=200)  # 200 паралельних psql-процесів
pool.imap_unordered(upload_chunk, chunk_files, chunksize=1)

Кожен воркер:

  1. Читає TSV-чанк з диска (~5000 рядків, ~25 МБ)
  2. Формує SQL: CREATE TEMP TABLECOPY FROM STDININSERT ON CONFLICTDROP TABLE
  3. Виконує через subprocess.run(["psql", "-h", prod_host, ...])
  4. Парсить stdout на INSERT 0 N для підрахунку скопійованих
  5. Видаляє чанк-файл після успіху

Чому psql subprocess, а не psycopg2? Python GIL. 200 тредів з psycopg2 серіалізуються на GIL при обробці мережевих буферів. 200 subprocess -- це 200 окремих процесів, кожен з власним TCP-з'єднанням. Повна утилізація мережевої пропускної здатності.

SET lock_timeout = '5min' на кожному чанку -- захист від deadlock при конкурентних INSERT в один шард.

Resumability: Чанки видаляються тільки після успішного INSERT. --skip-export дозволяє перезапустити тільки фазу upload з наявних чанків. --resume-from-doc-id дозволяє доекспортувати нові дані до існуючих чанків.

Прогрес: кожні 200 чанків: copied, skipped (already exist), errors, rows/s, ETA.

Розмір воркер-пулу: чому 200?

Продакшн PostgreSQL: max_connections=500, PgBouncer у transaction mode. 200 воркерів = 200 concurrent connections. Кожен воркер тримає з'єднання ~2-5 секунд (COPY + INSERT). При 200 workers і chunk_size=5000: throughput ~100K-200K rows/s, залежно від мережевої латентності.

500 воркерів -- oversaturation: PG починає тротлити на lock contention (concurrent INSERT в той самий індекс). 100 воркерів -- недовантаження мережі. 200 -- емпіричний оптимум для нашого EC2 t3.xlarge.

Data quality

Метрика Значення
RTF-конвертація: точність 99.5% (manual validation, n=1000)
Покриття по роках (2021-2026) 94-97%
Gaps 3-6% -- документи без RTF (тільки метадані)
Дублікати 0 (ON CONFLICT DO NOTHING)
Encoding errors <0.1% (surrogate replacement)

3-6% gap -- це документи, для яких ЄДРСР не публікує повний текст (закриті провадження, рішення з обмеженим доступом за ЗУ «Про судоустрій та статус суддів»).

Результати

Метрика Значення
Повних текстів на проді ~60,000,000
Шардів 4 (одна PG інстанція, EC2 t3.xlarge)
Загальний розмір 283 ГБ (EBS gp3)
Індекси (B-tree PK) ~2.8 ГБ per shard
Backup S4 (8 ГБ) ~2 хв
Backup S1 (146 ГБ) ~90 хв
Воркерів завантаження 100 (asyncio)
Воркерів конвертації 12 (multiprocessing)
Воркерів продакшн-копії 200 (subprocess)
Pipeline idempotent Так (ON CONFLICT DO NOTHING + file-level resume)

Що далі

Повні тексти -- це сировина для двох наступних шарів:

  1. Векторні ембедінги. 60M × 1536 dim (text-embedding-ada-002) = ~350 ГБ у Qdrant. Це потребує batch-embedding pipeline з rate limiting (OpenAI TPM), chunking довгих текстів, та incremental update strategy.

  2. Semantic sectioning. Розбиття рішень на логічні секції (мотивувальна частина, резолютивна частина, окрема думка) для точнішого пошуку. SemanticSectionizer вже працює для окремих документів, але batch-обробка 60M -- окремий виклик.


Відкриті дані -- це не компроміс. Це архітектурне рішення. 60 мільйонів повних текстів, 283 ГБ на 4 шардах, idempotent pipeline з нульовою толерантністю до втрати даних -- і все побудовано на публічних джерелах, без залежності від комерційних API.