ЕГРСР: data pipeline для 60 миллионов судебных решений
60 миллионов полных текстов. 283 ГБ на 4 шардах. Кастомный RTF-парсер с depth-tracking для Windows-1251 кириллицы. Двухфазный ETL с idempotent upsert через temp-таблицы. Application-level sharding по doc_id с независимыми backup domains. PostgreSQL shared memory exhaustion и три уровня защиты. Всё на открытых данных ЕГРСР.
ЕГРСР: data pipeline для 60 миллионов судебных решений
Архитектура ETL-системы, которая переносит весь Единый государственный реестр судебных решений в 4-шардовую PostgreSQL-инфраструктуру -- от модели данных и RTF-парсинга до capacity planning и операционных trade-offs.
Контекст задачи
LEX AI -- платформа семантического поиска по судебной практике. Ядро поиска -- векторные эмбеддинги (text-embedding-ada-002, 1536 dim), которые генерируются из полных текстов решений. Без текста нет эмбеддингов, без эмбеддингов нет семантического поиска.
ЕГРСР (Единый государственный реестр судебных решений) -- это ~60M документов от 685 судов всех инстанций, с 2006 года по сегодня. Полные тексты хранятся в формате RTF с кодировкой Windows-1251.
Масштаб задачи:
| Параметр | Значение |
|---|---|
| Документов в реестре | ~60,000,000 |
| Средний размер RTF | ~4.5 КБ |
| Средний размер plaintext | ~2.3 КБ |
| Суммарный объём текста | 283 ГБ (PostgreSQL) |
| Судов-источников | 685 |
| Временной диапазон | 2006--2026 |
Принципиальное решение: только открытые данные
Мы сознательно выбрали работать исключительно с открытыми источниками. Портал reyestr.court.gov.ua публикует судебные решения в открытом доступе -- это публичная информация по Закону Украины «О доступе к публичной информации».
Причина не только этическая. Коммерческие API несут операционные риски: rate limits, блокировка токенов при bulk-загрузке, зависимость от третьей стороны. Конкретный инцидент: bulk-загрузка court_sessions (~35K запросов за 2.7 часа) привела к блокировке обоих API-токенов ZakonOnline, что вывело из строя продакшн-чат.
| Источник | Что получаем | Модель доступа |
|---|---|---|
| reyestr.court.gov.ua | Полные тексты в RTF | HTTP GET, rate-limited, бесплатно |
| data.gov.ua | Метаданные (CSV dumps) | Bulk download, обновление ежедневно |
| Коммерческие API | То же + JSON | REST API, платно, токены блокируются |
Модель данных
Прежде чем говорить о pipeline, стоит понять целевую схему. Мы разделили метаданные и полные тексты в две отдельные таблицы -- это ключевое архитектурное решение.
Метаданные: edrsr_documents
CREATE TABLE edrsr_documents (
doc_id BIGINT PRIMARY KEY, -- PK из ЕГРСР, автоинкремент
court_code INTEGER, -- FK на edrsr_courts (без constraint)
judgment_code SMALLINT, -- тип решения (приговор, определение, постановление)
justice_kind SMALLINT, -- вид судопроизводства
category_code INTEGER, -- категория дела (4106 категорий)
cause_num TEXT, -- номер дела
adjudication_date TIMESTAMPTZ, -- дата вынесения
receipt_date TIMESTAMPTZ, -- дата поступления в реестр
judge TEXT, -- судья/коллегия
doc_url TEXT, -- URL на RTF в реестре
status SMALLINT DEFAULT 0,
date_publ TIMESTAMPTZ
);
Намеренное отсутствие FK constraints. Исходные данные с data.gov.ua содержат court_code, justice_kind, category_code, которые не всегда присутствуют в справочных таблицах. С FK constraints импорт ломается на каждой «грязной» строке. Без них -- мы импортируем всё, а валидацию делаем на уровне запросов.
Почему doc_id BIGINT, а не UUID? doc_id -- это натуральный ключ из ЕГРСР (автоинкремент). Он монотонно растёт, что даёт идеальный B-tree с минимальной фрагментацией при последовательном импорте. UUID дал бы случайные вставки по всему индексу -- на 60M строк это существенная разница в I/O.
8 индексов на типичные паттерны запросов: court_code, justice_kind, judgment_code, category_code, cause_num, judge, adjudication_date, receipt_date. Каждый обоснован реальным use case (фильтрация по суду, по виду судопроизводства, поиск по номеру дела).
Полные тексты: edrsr_fulltext
CREATE TABLE edrsr_fulltext (
doc_id BIGINT PRIMARY KEY, -- join key к edrsr_documents
full_text TEXT, -- plaintext после RTF-конвертации
text_length INTEGER, -- pre-computed для фильтрации
created_at TIMESTAMP DEFAULT NOW()
);
Почему отдельная таблица, а не колонка в edrsr_documents? Три причины:
TOAST-сегментация. PostgreSQL хранит TEXT > 2 КБ в отдельных TOAST-страницах. Если full_text лежит в той же таблице, что и метаданные, то
SELECT court_code, cause_num FROM edrsr_documentsвсё равно будет затрагивать TOAST-страницы при sequential scan. Отдельная таблица = чистый sequential scan по метаданным без overhead.Разные lifecycle. Метаданные импортируются из CSV-дампов data.gov.ua (ежедневное обновление). Полные тексты загружаются с reyestr.court.gov.ua (одноразовый bulk + incremental). Разные источники, разные скрипты, разная частота.
Независимый шардинг. Полные тексты занимают 283 ГБ против ~12 ГБ метаданных. Шардить нужно только тексты, метаданные остаются в одной базе.
Справочники
5 справочных таблиц: courts (685), instances (3), regions (27), justice_kinds (5), judgment_forms (10+), cause_categories (4106). Импортируются один раз, обновляются редко.
Архитектура pipeline
Pipeline реализован как 4 независимых Python-скрипта. Каждый idempotent -- можно перезапускать без потери данных и дубликатов.
┌─────────────────────┐ ┌──────────────────────┐ ┌──────────────────┐ ┌──────────────────────┐
│ 1. Download RTF │ │ 2. Import from HDD │ │ 3. Monitor │ │ 4. Copy to Prod │
│ │ │ │ │ │ │ │
│ asyncio + aiohttp │───▶│ multiprocessing │───▶│ PG aggregate │───▶│ 2-phase ETL │
│ 100 workers │ │ 12 CPU workers │ │ + in-mem cache │ │ 200 psql workers │
│ 3 retries + backoff│ │ COPY FROM STDIN │ │ cross-env stats │ │ TSV chunks on NVMe │
│ │ │ ON CONFLICT NOTHING │ │ │ │ ON CONFLICT NOTHING │
│ reyestr.court.gov │ │ HDD → PG local │ │ local/stage/prod│ │ PG local → PG prod │
│ → /tmp/edrsr-rtf/ │ │ 18 TB /dev/sda1 │ │ │ │ per-shard routing │
└─────────────────────┘ └──────────────────────┘ └──────────────────┘ └──────────────────────┘
Этап 1: Загрузка RTF
I/O-модель: async HTTP GET → disk write. Network-bound задача, поэтому asyncio + aiohttp с TCPConnector(limit=100, limit_per_host=100).
semaphore = asyncio.Semaphore(100) # 100 concurrent downloads
# Retry: 3 attempts, exponential backoff (2s, 4s, 6s)
# 429 handling: sleep 5 * (attempt + 1) seconds
Resumability. Перед загрузкой проверяем outpath.exists() and outpath.stat().st_size > 0. Если файл уже есть и не пустой -- пропускаем. Это позволяет перезапускать скрипт без повторной загрузки.
Файловая конвенция: {doc_id}.rtf -- doc_id является именем файла. Это даёт O(1) lookup без базы метаданных: int(filename[:-4]) → doc_id.
RTF-парсер: почему кастомный
RTF из ЕГРСР -- не обычный RTF. Это Windows-1251 кириллица, закодированная как \\'XX escape-последовательности внутри latin1-обёртки. Стандартные библиотеки (striprtf, pyrtf-ng) не различают Windows-1251 и latin1 байты и ломают кириллицу.
Наш парсер работает в 7 шагов:
1. raw bytes → latin1 decode (RTF envelope)
2. Remove nested groups: {\fonttbl ...}, {\colortbl ...},
{\stylesheet ...}, {\info ...}, {\*\ ...}
(depth-tracking brace parser, O(n))
3. Strip \rtf1 header
4. \par → \n, \line → \n, \tab → \t
5. \\'XX → Windows-1251 byte decode
6. \uNNNNN → chr(code), range check 0..0x10FFFF
7. Strip remaining \keyword sequences
8. Remove braces, null bytes, normalize newlines
9. UTF-8 surrogate cleanup: encode('utf-8', errors='surrogatepass')
.decode('utf-8', errors='replace')
Depth-tracking для вложенных групп. RTF-группа {\fonttbl {\f0 Times;}} может иметь произвольную глубину вложенности. Парсер отслеживает баланс {} и удаляет всю группу от открывающей до закрывающей скобки на том же уровне. Сложность O(n) по длине документа.
Точность: 99.5% на корпусе ~1000 вручную проверенных документов. 0.5% ошибок -- документы с нестандартными RTF-расширениями (встроенные изображения, OLE-объекты), где текст всё равно извлекается, но с артефактами.
Этап 2: Массовый импорт с HDD
Это главная рабочая лошадка pipeline. Все RTF-файлы лежат на 18 ТБ HDD (/dev/sda1), и скрипт должен конвертировать их в текст и загрузить в PostgreSQL.
Почему multiprocessing, а не asyncio? RTF-конвертация -- CPU-bound: 7 regex замен, итерация по символам для depth-tracking, encode/decode. Python GIL блокирует параллельное выполнение CPU-bound кода в тредах. multiprocessing.Pool с 12 воркерами (= количество ядер) обходит GIL через отдельные процессы.
Pool(processes=12, initializer=_init_worker, initargs=(rtf_lookup,))
pool.map(convert_one, batch_ids, chunksize=50)
chunksize=50: баланс между overhead на IPC (передача задач между процессами) и granularity. При chunksize=1 IPC overhead доминирует. При chunksize=1000 один медленный файл блокирует весь чанк.
I/O-паттерн: scandir вместо stat
На HDD с 15M+ файлов os.stat() -- bottleneck. Каждый stat() -- отдельный I/O seek на шпиндельном диске. При 15M файлов это ~4 часа только на stat().
# Один проход scandir -- построение lookup O(n)
rtf_lookup: dict[int, Path] = {}
for entry in os.scandir(rtf_dir): # readdir, без stat()
if entry.name.endswith('.rtf'):
doc_id = int(entry.name[:-4])
rtf_lookup[doc_id] = rtf_dir / entry.name
os.scandir() вызывает readdir() системного уровня, который возвращает имена файлов без stat(). Это один sequential read директории вместо 15M random seeks.
Idempotent upsert через temp-таблицу
Критический паттерн для любого data pipeline на больших объёмах:
CREATE TEMP TABLE _ft_tmp (doc_id bigint, full_text text);
COPY _ft_tmp FROM stdin; -- bulk load во временную
INSERT INTO edrsr_fulltext(doc_id, full_text)
SELECT doc_id, full_text FROM _ft_tmp
ON CONFLICT (doc_id) DO NOTHING; -- idempotent: дубликаты игнорируются
DROP TABLE _ft_tmp;
Почему не прямой COPY INTO edrsr_fulltext? COPY не поддерживает ON CONFLICT. Если в batch есть doc_id, который уже существует, весь COPY падает. Temp-таблица + INSERT ON CONFLICT -- это staging area с дедупликацией.
Почему не INSERT ... ON CONFLICT DO UPDATE? DO NOTHING дешевле: не генерирует WAL для неизменённых строк. Тексты не меняются после первого импорта, поэтому UPDATE не нужен.
Проверка уже импортированных
Перед конвертацией скрипт выгружает existing doc_id:
SELECT doc_id FROM edrsr_fulltext WHERE doc_id BETWEEN {min_id} AND {max_id};
to_import = sorted(set(rtf_lookup.keys()) - existing)
Это set difference на уровне Python -- O(n). Для 30M doc_id это ~2 ГБ памяти (64 байта на int в set), что приемлемо.
Этап 3: Мониторинг и PostgreSQL shared memory
Когда импортируешь миллионы записей, нужна observability. Мы построили админ-страницу с cross-environment агрегацией:
- KPI-карточки: total metadata, total fulltext, coverage %
- Таблица по годам с progress bars
- Данные из local, stage, prod (через
/api/internal/edrsr-stats) - Auto-refresh каждые 30 секунд
Инцидент: PG error 53100
could not resize shared memory segment -- No space left on device
Root cause. Запрос LEFT JOIN edrsr_documents (45M) x edrsr_fulltext с GROUP BY EXTRACT(YEAR FROM adjudication_date) требовал hash join. PostgreSQL аллоцирует hash table в shared memory. С work_mem=256MB одна такая операция съедала весь shm_size контейнера (Docker default: 64 МБ).
Auto-refresh frontend каждые 30с = ~120 таких запросов/час. Каждый -- потенциальный OOM на shared memory.
Три уровня защиты:
1. Query decomposition. Вместо одного JOIN -- два отдельных COUNT:
-- Query 1: metadata counts
SELECT EXTRACT(YEAR FROM adjudication_date)::int AS year,
COUNT(*)::int AS total FROM edrsr_documents GROUP BY year;
-- Query 2: fulltext counts
SELECT EXTRACT(YEAR FROM d.adjudication_date)::int AS year,
COUNT(f.doc_id)::int AS with_fulltext
FROM edrsr_documents d
LEFT JOIN edrsr_fulltext f ON f.doc_id = d.doc_id GROUP BY year;
Merge происходит в Node.js. Каждый запрос работает с меньшим hash table.
2. work_mem throttling. SET LOCAL work_mem='32MB' в транзакции. 32 МБ вместо 256 МБ -- 8x меньше давления на shared memory. SET LOCAL сбрасывается после транзакции, не влияет на другие соединения.
3. In-memory cache (TTL 5 мин). Node.js Map с timestamp. Идентичные ответы отдаются из кеша. 120 запросов/час → 12 запросов/час.
Safety net: shm_size: 2g в Docker Compose. Не фикс, а страховка.
Архитектура шардинга: 4 базы на одном PostgreSQL
Capacity planning
60M строк × ~4.7 КБ средний размер (текст + overhead) = ~283 ГБ
EC2 t3.xlarge: 4 vCPU, 16 ГБ RAM, EBS gp3
shared_buffers = 4 ГБ (25% RAM)
effective_cache_size = 12 ГБ
283 ГБ данных при 4 ГБ shared_buffers означает buffer hit ratio ~1.4%. Для sequential scan (VACUUM, ANALYZE) это приемлемо. Для point lookups по doc_id (PK) -- B-tree индекс ~2.8 ГБ помещается в shared_buffers.
Проблема single-database: pg_dump 283 ГБ -- это ~4 часа. Если упадёт на 90% -- начинаете сначала. VACUUM FULL на таблице 283 ГБ -- нужен двойной дисковый объём (566 ГБ). autovacuum на 60M строк с большим dead tuple ratio может работать часами.
Стратегия шардинга
Application-level sharding по doc_id ranges. 4 отдельные базы в одном PostgreSQL-контейнере:
| Шард | База | Диапазон doc_id | Строк | Размер | Backup time |
|---|---|---|---|---|---|
| S1 | secondlayer_prod |
< 112M | ~24M | 146 ГБ | ~90 мин |
| S2 | secondlayer_prod_ft2 |
112M--150M | ~26M | 101 ГБ | ~60 мин |
| S3 | secondlayer_prod_ft3 |
150M--175M | ~8M | 27 ГБ | ~15 мин |
| S4 | secondlayer_prod_ft4 |
> 175M | ~2M | 8 ГБ | ~2 мин |
Почему не нативный partitioning? Declarative range partitions решили бы проблему VACUUM (каждая partition -- отдельный heap), но НЕ pg_dump: все партиции живут в одной базе, и дамп/рестор работает на уровне базы целиком. С отдельными базами -- 4 независимых pg_dump | pg_restore параллельно.
Почему не Citus? Citus требует coordinator + workers (минимум 2 ноды) или managed-сервис. Наш access pattern -- point lookups по doc_id -- не требует distributed query planning. Также Citus не даёт независимых backup domains.
Почему не FDW (Foreign Data Wrappers)? Рассматривали postgres_fdw для прозрачного cross-shard query. Отвергли: fdw добавляет latency (~2ms overhead на запрос), не поддерживает pushdown для всех операций, и усложняет backup (fdw-таблицы не дампятся стандартным pg_dump).
Маршрутизация запросов
Ключ шардинга -- doc_id (BIGINT). Монотонно растёт, поэтому range sharding естественен:
doc_id < 112,000,000 → secondlayer_prod (S1)
112M ≤ doc_id < 150,000,000 → secondlayer_prod_ft2 (S2)
150M ≤ doc_id < 175,000,000 → secondlayer_prod_ft3 (S3)
doc_id ≥ 175,000,000 → secondlayer_prod_ft4 (S4)
Backend маршрутизирует на уровне connection pool: 4 пула PgBouncer, каждый на свою базу. Для нового шарда -- добавить базу, пул и обновить range map.
Мониторинг: endpoint /api/internal/edrsr-stats собирает count со всех шардов через pg_class.reltuples (approximate count, O(1)) вместо COUNT(*) (sequential scan, O(n)).
Trade-offs
| Аспект | Плюс | Минус |
|---|---|---|
| Backup | Независимый per-shard (ft4 = 2 мин) | 4 отдельных cron jobs |
| VACUUM | Параллельный, меньшие таблицы | 4 autovacuum workers |
| Queries | Point lookup O(log n) | Cross-shard JOIN только в Node.js |
| Connections | Изолированные пулы | 4× connection overhead в PgBouncer |
| Ops | Можно дропнуть/перестроить один шард | Ручной range management |
Копирование на продакшн: двухфазный ETL
Перенести 60M строк (283 ГБ) с локального PG на 4 шарда продакшна через сеть -- отдельная инженерная задача. Скрипт copy-fulltext-to-prod.py реализует двухфазный подход.
Фаза 1: Export (sequential read → TSV chunks)
# Один streaming COPY из local PG → TSV-файлы на NVMe
export_sql = "\\COPY (SELECT doc_id, full_text FROM edrsr_fulltext "
f"WHERE {where} ORDER BY doc_id) TO STDOUT WITH (FORMAT text)"
proc = subprocess.Popen(LOCAL_CMD + ["-c", export_sql], stdout=PIPE)
for line in proc.stdout: # streaming, без накопления в памяти
current_file.write(line)
if line_count >= chunk_size: # default 5000 строк
rotate_to_next_chunk()
Почему TSV, а не CSV? COPY text format (TSV) -- native PostgreSQL формат. Не нужен CSV parsing на стороне приёма. Escaping проще: tab-separated, backslash-escaping.
Почему chunk files, а не pipe? Resumability. Если сеть упадёт на 70% upload -- restart подберёт неотправленные чанки. Каждый чанк = atomic unit of work.
I/O pattern: Sequential read из local PG (NVMe) → sequential write в /tmp/edrsr-ft-chunks/. Один поток, без конкуренции за диск.
Фаза 2: Upload (parallel workers → prod PG)
Pool(processes=200) # 200 параллельных psql-процессов
pool.imap_unordered(upload_chunk, chunk_files, chunksize=1)
Каждый воркер:
- Читает TSV-чанк с диска (~5000 строк, ~25 МБ)
- Формирует SQL:
CREATE TEMP TABLE→COPY FROM STDIN→INSERT ON CONFLICT→DROP TABLE - Выполняет через
subprocess.run(["psql", "-h", prod_host, ...]) - Парсит stdout на
INSERT 0 Nдля подсчёта скопированных - Удаляет чанк-файл после успеха
Почему psql subprocess, а не psycopg2? Python GIL. 200 тредов с psycopg2 сериализуются на GIL при обработке сетевых буферов. 200 subprocess -- это 200 отдельных процессов, каждый со своим TCP-соединением. Полная утилизация сетевой пропускной способности.
SET lock_timeout = '5min' на каждом чанке -- защита от deadlock при конкурентных INSERT в один шард.
Resumability: Чанки удаляются только после успешного INSERT. --skip-export позволяет перезапустить только фазу upload из имеющихся чанков. --resume-from-doc-id позволяет доэкспортировать новые данные к существующим чанкам.
Прогресс: каждые 200 чанков: copied, skipped (already exist), errors, rows/s, ETA.
Размер воркер-пула: почему 200?
Продакшн PostgreSQL: max_connections=500, PgBouncer в transaction mode. 200 воркеров = 200 concurrent connections. Каждый воркер держит соединение ~2-5 секунд (COPY + INSERT). При 200 workers и chunk_size=5000: throughput ~100K-200K rows/s, в зависимости от сетевой латентности.
500 воркеров -- oversaturation: PG начинает тротлить на lock contention (concurrent INSERT в тот же индекс). 100 воркеров -- недогрузка сети. 200 -- эмпирический оптимум для нашего EC2 t3.xlarge.
Data quality
| Метрика | Значение |
|---|---|
| RTF-конвертация: точность | 99.5% (manual validation, n=1000) |
| Покрытие по годам (2021-2026) | 94-97% |
| Gaps | 3-6% -- документы без RTF (только метаданные) |
| Дубликаты | 0 (ON CONFLICT DO NOTHING) |
| Encoding errors | <0.1% (surrogate replacement) |
3-6% gap -- это документы, для которых ЕГРСР не публикует полный текст (закрытые производства, решения с ограниченным доступом по ЗУ «О судоустройстве и статусе судей»).
Результаты
| Метрика | Значение |
|---|---|
| Полных текстов на проде | ~60,000,000 |
| Шардов | 4 (одна PG инстанция, EC2 t3.xlarge) |
| Общий размер | 283 ГБ (EBS gp3) |
| Индексы (B-tree PK) | ~2.8 ГБ per shard |
| Backup S4 (8 ГБ) | ~2 мин |
| Backup S1 (146 ГБ) | ~90 мин |
| Воркеров загрузки | 100 (asyncio) |
| Воркеров конвертации | 12 (multiprocessing) |
| Воркеров продакшн-копии | 200 (subprocess) |
| Pipeline idempotent | Да (ON CONFLICT DO NOTHING + file-level resume) |
Что дальше
Полные тексты -- это сырьё для двух следующих слоёв:
Векторные эмбеддинги. 60M × 1536 dim (text-embedding-ada-002) = ~350 ГБ в Qdrant. Это потребует batch-embedding pipeline с rate limiting (OpenAI TPM), chunking длинных текстов и incremental update strategy.
Semantic sectioning. Разбиение решений на логические секции (мотивировочная часть, резолютивная часть, особое мнение) для более точного поиска. SemanticSectionizer уже работает для отдельных документов, но batch-обработка 60M -- отдельный вызов.
Открытые данные -- это не компромисс. Это архитектурное решение. 60 миллионов полных текстов, 283 ГБ на 4 шардах, idempotent pipeline с нулевой толерантностью к потере данных -- и всё построено на публичных источниках, без зависимости от коммерческих API.