LEX — AI Legal Platform for Law Firms

AI-powered legal analysis platform for law firms and corporate counsel.

Features

Resources

Blog Articles

Technology

Built on AWS (EC2, Bedrock Claude AI, ALB, WAF, S3, ACM, KMS). PostgreSQL, Redis, Qdrant vector database. TypeScript, React, Node.js.

Start free — 50 credits on registration. Sign up

TECH 15 мин

ЕГРСР: data pipeline для 60 миллионов судебных решений

60 миллионов полных текстов. 283 ГБ на 4 шардах. Кастомный RTF-парсер с depth-tracking для Windows-1251 кириллицы. Двухфазный ETL с idempotent upsert через temp-таблицы. Application-level sharding по doc_id с независимыми backup domains. PostgreSQL shared memory exhaustion и три уровня защиты. Всё на открытых данных ЕГРСР.

ЕГРСР: data pipeline для 60 миллионов судебных решений

Архитектура ETL-системы, которая переносит весь Единый государственный реестр судебных решений в 4-шардовую PostgreSQL-инфраструктуру -- от модели данных и RTF-парсинга до capacity planning и операционных trade-offs.


Контекст задачи

LEX AI -- платформа семантического поиска по судебной практике. Ядро поиска -- векторные эмбеддинги (text-embedding-ada-002, 1536 dim), которые генерируются из полных текстов решений. Без текста нет эмбеддингов, без эмбеддингов нет семантического поиска.

ЕГРСР (Единый государственный реестр судебных решений) -- это ~60M документов от 685 судов всех инстанций, с 2006 года по сегодня. Полные тексты хранятся в формате RTF с кодировкой Windows-1251.

Масштаб задачи:

Параметр Значение
Документов в реестре ~60,000,000
Средний размер RTF ~4.5 КБ
Средний размер plaintext ~2.3 КБ
Суммарный объём текста 283 ГБ (PostgreSQL)
Судов-источников 685
Временной диапазон 2006--2026

Принципиальное решение: только открытые данные

Мы сознательно выбрали работать исключительно с открытыми источниками. Портал reyestr.court.gov.ua публикует судебные решения в открытом доступе -- это публичная информация по Закону Украины «О доступе к публичной информации».

Причина не только этическая. Коммерческие API несут операционные риски: rate limits, блокировка токенов при bulk-загрузке, зависимость от третьей стороны. Конкретный инцидент: bulk-загрузка court_sessions (~35K запросов за 2.7 часа) привела к блокировке обоих API-токенов ZakonOnline, что вывело из строя продакшн-чат.

Источник Что получаем Модель доступа
reyestr.court.gov.ua Полные тексты в RTF HTTP GET, rate-limited, бесплатно
data.gov.ua Метаданные (CSV dumps) Bulk download, обновление ежедневно
Коммерческие API То же + JSON REST API, платно, токены блокируются

Модель данных

Прежде чем говорить о pipeline, стоит понять целевую схему. Мы разделили метаданные и полные тексты в две отдельные таблицы -- это ключевое архитектурное решение.

Метаданные: edrsr_documents

CREATE TABLE edrsr_documents (
  doc_id       BIGINT PRIMARY KEY,   -- PK из ЕГРСР, автоинкремент
  court_code   INTEGER,              -- FK на edrsr_courts (без constraint)
  judgment_code SMALLINT,            -- тип решения (приговор, определение, постановление)
  justice_kind SMALLINT,             -- вид судопроизводства
  category_code INTEGER,             -- категория дела (4106 категорий)
  cause_num    TEXT,                  -- номер дела
  adjudication_date TIMESTAMPTZ,     -- дата вынесения
  receipt_date TIMESTAMPTZ,          -- дата поступления в реестр
  judge        TEXT,                  -- судья/коллегия
  doc_url      TEXT,                  -- URL на RTF в реестре
  status       SMALLINT DEFAULT 0,
  date_publ    TIMESTAMPTZ
);

Намеренное отсутствие FK constraints. Исходные данные с data.gov.ua содержат court_code, justice_kind, category_code, которые не всегда присутствуют в справочных таблицах. С FK constraints импорт ломается на каждой «грязной» строке. Без них -- мы импортируем всё, а валидацию делаем на уровне запросов.

Почему doc_id BIGINT, а не UUID? doc_id -- это натуральный ключ из ЕГРСР (автоинкремент). Он монотонно растёт, что даёт идеальный B-tree с минимальной фрагментацией при последовательном импорте. UUID дал бы случайные вставки по всему индексу -- на 60M строк это существенная разница в I/O.

8 индексов на типичные паттерны запросов: court_code, justice_kind, judgment_code, category_code, cause_num, judge, adjudication_date, receipt_date. Каждый обоснован реальным use case (фильтрация по суду, по виду судопроизводства, поиск по номеру дела).

Полные тексты: edrsr_fulltext

CREATE TABLE edrsr_fulltext (
  doc_id      BIGINT PRIMARY KEY,  -- join key к edrsr_documents
  full_text   TEXT,                -- plaintext после RTF-конвертации
  text_length INTEGER,             -- pre-computed для фильтрации
  created_at  TIMESTAMP DEFAULT NOW()
);

Почему отдельная таблица, а не колонка в edrsr_documents? Три причины:

  1. TOAST-сегментация. PostgreSQL хранит TEXT > 2 КБ в отдельных TOAST-страницах. Если full_text лежит в той же таблице, что и метаданные, то SELECT court_code, cause_num FROM edrsr_documents всё равно будет затрагивать TOAST-страницы при sequential scan. Отдельная таблица = чистый sequential scan по метаданным без overhead.

  2. Разные lifecycle. Метаданные импортируются из CSV-дампов data.gov.ua (ежедневное обновление). Полные тексты загружаются с reyestr.court.gov.ua (одноразовый bulk + incremental). Разные источники, разные скрипты, разная частота.

  3. Независимый шардинг. Полные тексты занимают 283 ГБ против ~12 ГБ метаданных. Шардить нужно только тексты, метаданные остаются в одной базе.

Справочники

5 справочных таблиц: courts (685), instances (3), regions (27), justice_kinds (5), judgment_forms (10+), cause_categories (4106). Импортируются один раз, обновляются редко.

Архитектура pipeline

Pipeline реализован как 4 независимых Python-скрипта. Каждый idempotent -- можно перезапускать без потери данных и дубликатов.

┌─────────────────────┐    ┌──────────────────────┐    ┌──────────────────┐    ┌──────────────────────┐
│  1. Download RTF    │    │  2. Import from HDD  │    │  3. Monitor      │    │  4. Copy to Prod     │
│                     │    │                      │    │                  │    │                      │
│  asyncio + aiohttp  │───▶│  multiprocessing     │───▶│  PG aggregate    │───▶│  2-phase ETL         │
│  100 workers        │    │  12 CPU workers      │    │  + in-mem cache  │    │  200 psql workers    │
│  3 retries + backoff│    │  COPY FROM STDIN     │    │  cross-env stats │    │  TSV chunks on NVMe  │
│                     │    │  ON CONFLICT NOTHING  │    │                  │    │  ON CONFLICT NOTHING  │
│  reyestr.court.gov  │    │  HDD → PG local      │    │  local/stage/prod│    │  PG local → PG prod  │
│  → /tmp/edrsr-rtf/  │    │  18 TB /dev/sda1     │    │                  │    │  per-shard routing   │
└─────────────────────┘    └──────────────────────┘    └──────────────────┘    └──────────────────────┘

Этап 1: Загрузка RTF

I/O-модель: async HTTP GET → disk write. Network-bound задача, поэтому asyncio + aiohttp с TCPConnector(limit=100, limit_per_host=100).

semaphore = asyncio.Semaphore(100)  # 100 concurrent downloads
# Retry: 3 attempts, exponential backoff (2s, 4s, 6s)
# 429 handling: sleep 5 * (attempt + 1) seconds

Resumability. Перед загрузкой проверяем outpath.exists() and outpath.stat().st_size > 0. Если файл уже есть и не пустой -- пропускаем. Это позволяет перезапускать скрипт без повторной загрузки.

Файловая конвенция: {doc_id}.rtf -- doc_id является именем файла. Это даёт O(1) lookup без базы метаданных: int(filename[:-4]) → doc_id.

RTF-парсер: почему кастомный

RTF из ЕГРСР -- не обычный RTF. Это Windows-1251 кириллица, закодированная как \\'XX escape-последовательности внутри latin1-обёртки. Стандартные библиотеки (striprtf, pyrtf-ng) не различают Windows-1251 и latin1 байты и ломают кириллицу.

Наш парсер работает в 7 шагов:

1. raw bytes → latin1 decode (RTF envelope)
2. Remove nested groups: {\fonttbl ...}, {\colortbl ...},
   {\stylesheet ...}, {\info ...}, {\*\ ...}
   (depth-tracking brace parser, O(n))
3. Strip \rtf1 header
4. \par → \n, \line → \n, \tab → \t
5. \\'XX → Windows-1251 byte decode
6. \uNNNNN → chr(code), range check 0..0x10FFFF
7. Strip remaining \keyword sequences
8. Remove braces, null bytes, normalize newlines
9. UTF-8 surrogate cleanup: encode('utf-8', errors='surrogatepass')
                            .decode('utf-8', errors='replace')

Depth-tracking для вложенных групп. RTF-группа {\fonttbl {\f0 Times;}} может иметь произвольную глубину вложенности. Парсер отслеживает баланс {} и удаляет всю группу от открывающей до закрывающей скобки на том же уровне. Сложность O(n) по длине документа.

Точность: 99.5% на корпусе ~1000 вручную проверенных документов. 0.5% ошибок -- документы с нестандартными RTF-расширениями (встроенные изображения, OLE-объекты), где текст всё равно извлекается, но с артефактами.

Этап 2: Массовый импорт с HDD

Это главная рабочая лошадка pipeline. Все RTF-файлы лежат на 18 ТБ HDD (/dev/sda1), и скрипт должен конвертировать их в текст и загрузить в PostgreSQL.

Почему multiprocessing, а не asyncio? RTF-конвертация -- CPU-bound: 7 regex замен, итерация по символам для depth-tracking, encode/decode. Python GIL блокирует параллельное выполнение CPU-bound кода в тредах. multiprocessing.Pool с 12 воркерами (= количество ядер) обходит GIL через отдельные процессы.

Pool(processes=12, initializer=_init_worker, initargs=(rtf_lookup,))
pool.map(convert_one, batch_ids, chunksize=50)

chunksize=50: баланс между overhead на IPC (передача задач между процессами) и granularity. При chunksize=1 IPC overhead доминирует. При chunksize=1000 один медленный файл блокирует весь чанк.

I/O-паттерн: scandir вместо stat

На HDD с 15M+ файлов os.stat() -- bottleneck. Каждый stat() -- отдельный I/O seek на шпиндельном диске. При 15M файлов это ~4 часа только на stat().

# Один проход scandir -- построение lookup O(n)
rtf_lookup: dict[int, Path] = {}
for entry in os.scandir(rtf_dir):   # readdir, без stat()
    if entry.name.endswith('.rtf'):
        doc_id = int(entry.name[:-4])
        rtf_lookup[doc_id] = rtf_dir / entry.name

os.scandir() вызывает readdir() системного уровня, который возвращает имена файлов без stat(). Это один sequential read директории вместо 15M random seeks.

Idempotent upsert через temp-таблицу

Критический паттерн для любого data pipeline на больших объёмах:

CREATE TEMP TABLE _ft_tmp (doc_id bigint, full_text text);
COPY _ft_tmp FROM stdin;            -- bulk load во временную
INSERT INTO edrsr_fulltext(doc_id, full_text)
SELECT doc_id, full_text FROM _ft_tmp
ON CONFLICT (doc_id) DO NOTHING;    -- idempotent: дубликаты игнорируются
DROP TABLE _ft_tmp;

Почему не прямой COPY INTO edrsr_fulltext? COPY не поддерживает ON CONFLICT. Если в batch есть doc_id, который уже существует, весь COPY падает. Temp-таблица + INSERT ON CONFLICT -- это staging area с дедупликацией.

Почему не INSERT ... ON CONFLICT DO UPDATE? DO NOTHING дешевле: не генерирует WAL для неизменённых строк. Тексты не меняются после первого импорта, поэтому UPDATE не нужен.

Проверка уже импортированных

Перед конвертацией скрипт выгружает existing doc_id:

SELECT doc_id FROM edrsr_fulltext WHERE doc_id BETWEEN {min_id} AND {max_id};
to_import = sorted(set(rtf_lookup.keys()) - existing)

Это set difference на уровне Python -- O(n). Для 30M doc_id это ~2 ГБ памяти (64 байта на int в set), что приемлемо.

Этап 3: Мониторинг и PostgreSQL shared memory

Когда импортируешь миллионы записей, нужна observability. Мы построили админ-страницу с cross-environment агрегацией:

Инцидент: PG error 53100

could not resize shared memory segment -- No space left on device

Root cause. Запрос LEFT JOIN edrsr_documents (45M) x edrsr_fulltext с GROUP BY EXTRACT(YEAR FROM adjudication_date) требовал hash join. PostgreSQL аллоцирует hash table в shared memory. С work_mem=256MB одна такая операция съедала весь shm_size контейнера (Docker default: 64 МБ).

Auto-refresh frontend каждые 30с = ~120 таких запросов/час. Каждый -- потенциальный OOM на shared memory.

Три уровня защиты:

1. Query decomposition. Вместо одного JOIN -- два отдельных COUNT:

-- Query 1: metadata counts
SELECT EXTRACT(YEAR FROM adjudication_date)::int AS year,
       COUNT(*)::int AS total FROM edrsr_documents GROUP BY year;

-- Query 2: fulltext counts
SELECT EXTRACT(YEAR FROM d.adjudication_date)::int AS year,
       COUNT(f.doc_id)::int AS with_fulltext
FROM edrsr_documents d
LEFT JOIN edrsr_fulltext f ON f.doc_id = d.doc_id GROUP BY year;

Merge происходит в Node.js. Каждый запрос работает с меньшим hash table.

2. work_mem throttling. SET LOCAL work_mem='32MB' в транзакции. 32 МБ вместо 256 МБ -- 8x меньше давления на shared memory. SET LOCAL сбрасывается после транзакции, не влияет на другие соединения.

3. In-memory cache (TTL 5 мин). Node.js Map с timestamp. Идентичные ответы отдаются из кеша. 120 запросов/час → 12 запросов/час.

Safety net: shm_size: 2g в Docker Compose. Не фикс, а страховка.

Архитектура шардинга: 4 базы на одном PostgreSQL

Capacity planning

60M строк × ~4.7 КБ средний размер (текст + overhead) = ~283 ГБ
EC2 t3.xlarge: 4 vCPU, 16 ГБ RAM, EBS gp3
shared_buffers = 4 ГБ (25% RAM)
effective_cache_size = 12 ГБ

283 ГБ данных при 4 ГБ shared_buffers означает buffer hit ratio ~1.4%. Для sequential scan (VACUUM, ANALYZE) это приемлемо. Для point lookups по doc_id (PK) -- B-tree индекс ~2.8 ГБ помещается в shared_buffers.

Проблема single-database: pg_dump 283 ГБ -- это ~4 часа. Если упадёт на 90% -- начинаете сначала. VACUUM FULL на таблице 283 ГБ -- нужен двойной дисковый объём (566 ГБ). autovacuum на 60M строк с большим dead tuple ratio может работать часами.

Стратегия шардинга

Application-level sharding по doc_id ranges. 4 отдельные базы в одном PostgreSQL-контейнере:

Шард База Диапазон doc_id Строк Размер Backup time
S1 secondlayer_prod < 112M ~24M 146 ГБ ~90 мин
S2 secondlayer_prod_ft2 112M--150M ~26M 101 ГБ ~60 мин
S3 secondlayer_prod_ft3 150M--175M ~8M 27 ГБ ~15 мин
S4 secondlayer_prod_ft4 > 175M ~2M 8 ГБ ~2 мин

Почему не нативный partitioning? Declarative range partitions решили бы проблему VACUUM (каждая partition -- отдельный heap), но НЕ pg_dump: все партиции живут в одной базе, и дамп/рестор работает на уровне базы целиком. С отдельными базами -- 4 независимых pg_dump | pg_restore параллельно.

Почему не Citus? Citus требует coordinator + workers (минимум 2 ноды) или managed-сервис. Наш access pattern -- point lookups по doc_id -- не требует distributed query planning. Также Citus не даёт независимых backup domains.

Почему не FDW (Foreign Data Wrappers)? Рассматривали postgres_fdw для прозрачного cross-shard query. Отвергли: fdw добавляет latency (~2ms overhead на запрос), не поддерживает pushdown для всех операций, и усложняет backup (fdw-таблицы не дампятся стандартным pg_dump).

Маршрутизация запросов

Ключ шардинга -- doc_id (BIGINT). Монотонно растёт, поэтому range sharding естественен:

doc_id < 112,000,000        → secondlayer_prod      (S1)
112M ≤ doc_id < 150,000,000 → secondlayer_prod_ft2  (S2)
150M ≤ doc_id < 175,000,000 → secondlayer_prod_ft3  (S3)
doc_id ≥ 175,000,000        → secondlayer_prod_ft4  (S4)

Backend маршрутизирует на уровне connection pool: 4 пула PgBouncer, каждый на свою базу. Для нового шарда -- добавить базу, пул и обновить range map.

Мониторинг: endpoint /api/internal/edrsr-stats собирает count со всех шардов через pg_class.reltuples (approximate count, O(1)) вместо COUNT(*) (sequential scan, O(n)).

Trade-offs

Аспект Плюс Минус
Backup Независимый per-shard (ft4 = 2 мин) 4 отдельных cron jobs
VACUUM Параллельный, меньшие таблицы 4 autovacuum workers
Queries Point lookup O(log n) Cross-shard JOIN только в Node.js
Connections Изолированные пулы 4× connection overhead в PgBouncer
Ops Можно дропнуть/перестроить один шард Ручной range management

Копирование на продакшн: двухфазный ETL

Перенести 60M строк (283 ГБ) с локального PG на 4 шарда продакшна через сеть -- отдельная инженерная задача. Скрипт copy-fulltext-to-prod.py реализует двухфазный подход.

Фаза 1: Export (sequential read → TSV chunks)

# Один streaming COPY из local PG → TSV-файлы на NVMe
export_sql = "\\COPY (SELECT doc_id, full_text FROM edrsr_fulltext "
             f"WHERE {where} ORDER BY doc_id) TO STDOUT WITH (FORMAT text)"

proc = subprocess.Popen(LOCAL_CMD + ["-c", export_sql], stdout=PIPE)
for line in proc.stdout:  # streaming, без накопления в памяти
    current_file.write(line)
    if line_count >= chunk_size:  # default 5000 строк
        rotate_to_next_chunk()

Почему TSV, а не CSV? COPY text format (TSV) -- native PostgreSQL формат. Не нужен CSV parsing на стороне приёма. Escaping проще: tab-separated, backslash-escaping.

Почему chunk files, а не pipe? Resumability. Если сеть упадёт на 70% upload -- restart подберёт неотправленные чанки. Каждый чанк = atomic unit of work.

I/O pattern: Sequential read из local PG (NVMe) → sequential write в /tmp/edrsr-ft-chunks/. Один поток, без конкуренции за диск.

Фаза 2: Upload (parallel workers → prod PG)

Pool(processes=200)  # 200 параллельных psql-процессов
pool.imap_unordered(upload_chunk, chunk_files, chunksize=1)

Каждый воркер:

  1. Читает TSV-чанк с диска (~5000 строк, ~25 МБ)
  2. Формирует SQL: CREATE TEMP TABLECOPY FROM STDININSERT ON CONFLICTDROP TABLE
  3. Выполняет через subprocess.run(["psql", "-h", prod_host, ...])
  4. Парсит stdout на INSERT 0 N для подсчёта скопированных
  5. Удаляет чанк-файл после успеха

Почему psql subprocess, а не psycopg2? Python GIL. 200 тредов с psycopg2 сериализуются на GIL при обработке сетевых буферов. 200 subprocess -- это 200 отдельных процессов, каждый со своим TCP-соединением. Полная утилизация сетевой пропускной способности.

SET lock_timeout = '5min' на каждом чанке -- защита от deadlock при конкурентных INSERT в один шард.

Resumability: Чанки удаляются только после успешного INSERT. --skip-export позволяет перезапустить только фазу upload из имеющихся чанков. --resume-from-doc-id позволяет доэкспортировать новые данные к существующим чанкам.

Прогресс: каждые 200 чанков: copied, skipped (already exist), errors, rows/s, ETA.

Размер воркер-пула: почему 200?

Продакшн PostgreSQL: max_connections=500, PgBouncer в transaction mode. 200 воркеров = 200 concurrent connections. Каждый воркер держит соединение ~2-5 секунд (COPY + INSERT). При 200 workers и chunk_size=5000: throughput ~100K-200K rows/s, в зависимости от сетевой латентности.

500 воркеров -- oversaturation: PG начинает тротлить на lock contention (concurrent INSERT в тот же индекс). 100 воркеров -- недогрузка сети. 200 -- эмпирический оптимум для нашего EC2 t3.xlarge.

Data quality

Метрика Значение
RTF-конвертация: точность 99.5% (manual validation, n=1000)
Покрытие по годам (2021-2026) 94-97%
Gaps 3-6% -- документы без RTF (только метаданные)
Дубликаты 0 (ON CONFLICT DO NOTHING)
Encoding errors <0.1% (surrogate replacement)

3-6% gap -- это документы, для которых ЕГРСР не публикует полный текст (закрытые производства, решения с ограниченным доступом по ЗУ «О судоустройстве и статусе судей»).

Результаты

Метрика Значение
Полных текстов на проде ~60,000,000
Шардов 4 (одна PG инстанция, EC2 t3.xlarge)
Общий размер 283 ГБ (EBS gp3)
Индексы (B-tree PK) ~2.8 ГБ per shard
Backup S4 (8 ГБ) ~2 мин
Backup S1 (146 ГБ) ~90 мин
Воркеров загрузки 100 (asyncio)
Воркеров конвертации 12 (multiprocessing)
Воркеров продакшн-копии 200 (subprocess)
Pipeline idempotent Да (ON CONFLICT DO NOTHING + file-level resume)

Что дальше

Полные тексты -- это сырьё для двух следующих слоёв:

  1. Векторные эмбеддинги. 60M × 1536 dim (text-embedding-ada-002) = ~350 ГБ в Qdrant. Это потребует batch-embedding pipeline с rate limiting (OpenAI TPM), chunking длинных текстов и incremental update strategy.

  2. Semantic sectioning. Разбиение решений на логические секции (мотивировочная часть, резолютивная часть, особое мнение) для более точного поиска. SemanticSectionizer уже работает для отдельных документов, но batch-обработка 60M -- отдельный вызов.


Открытые данные -- это не компромисс. Это архитектурное решение. 60 миллионов полных текстов, 283 ГБ на 4 шардах, idempotent pipeline с нулевой толерантностью к потере данных -- и всё построено на публичных источниках, без зависимости от коммерческих API.