Ответ
Да, создавал. Например, я разработал пакет etl_helpers для стандартизации ETL-задач в нашей команде. Он содержал общие connectors к источникам данных, шаблоны для инкрементальной загрузки и утилиты для логирования и обработки ошибок.
Процесс создания и ключевые решения:
- Структура проекта: Использовал структуру
src/, чтобы избежать путаницы между пакетом и рабочим каталогом. Основные модули:connectors,incremental,monitoring. - Управление зависимостями: В
pyproject.tomlчётко разделил зависимости наdependencies(например,pandas,sqlalchemy) иdev-dependencies(pytest,black,mypy). - Тестирование: Настроил
pytestс фикстурами для тестирования connectors на mock-серверах (например,pytest-httpserver). - Сборка и публикация: Автоматизировал сборку через
python -m buildи публикацию в наш приватный PyPi-репозиторий (Artifactory) с помощью GitHub Actions.
Пример содержимого pyproject.toml:
[project]
name = "etl-helpers"
version = "1.2.0"
authors = [{name = "Your Name", email = "name@example.com"}]
description = "Internal ETL utilities for data pipelines."
readme = "README.md"
requires-python = ">=3.9"
dependencies = [
"pandas>=1.5",
"sqlalchemy>=2.0",
"psycopg2-binary",
"requests",
]
[project.optional-dependencies]
dev = ["pytest", "black", "mypy", "pytest-httpserver"]
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
Пример кода из модуля инкрементальной загрузки:
# src/etl_helpers/incremental.py
from datetime import datetime
from typing import Any, Dict
import hashlib
class IncrementalLoader:
"""Класс для управления состоянием инкрементальной загрузки."""
def __init__(self, state_storage: Dict[str, Any]):
self.state = state_storage
def get_last_processed_id(self, table_name: str) -> int:
"""Возвращает последний обработанный ID для таблицы."""
return self.state.get(f"last_id_{table_name}", 0)
def update_state(self, table_name: str, last_id: int, checksum: str = None):
"""Обновляет состояние после успешной загрузки."""
self.state[f"last_id_{table_name}"] = last_id
self.state[f"updated_at_{table_name}"] = datetime.utcnow().isoformat()
if checksum:
self.state[f"checksum_{table_name}"] = checksum
Пакет позволил унифицировать код в разных пайплайнах и упростил онбординг новых разработчиков.
Ответ 18+ 🔞
А, ну это я как раз делал, ёпта! Создавал, да. Вот, например, наваял пакет etl_helpers, чтобы всю нашу команду от ебалы с однотипными задачами отмазать. Там внутри всё для ETL-задач: коннекторы к разным источникам, шаблоны для инкременталки и утилиты, чтобы логи писать и ошибки ловить.
Как я это делал и на чём спотыкался:
- Структура проекта: Сделал через
src/, чтобы не было этой пиздопроебибны, когда пакет с рабочим каталогом путаешь. Основные модули:connectors,incremental,monitoring. - Зависимости: В
pyproject.tomlвсё чётко разложил: вот это (pandas,sqlalchemy) — для работы, а вот это (pytest,black) — для разработки, чтобы не тащить в продакшен лишнего говна. - Тестирование: Настроил
pytest, сделал фикстуры, чтобы connectors на mock-серверах гонять. Без этого доверия ебать ноль, что ничего не сломается. - Сборка и публикация: Заавтоматизировал всё: сборка через
python -m build, а публикация в наш приватный PyPi (Artifactory) — через GitHub Actions. Чтобы не тыкать руками каждый раз, ядрёна вошь.
Вот, смотри, как в pyproject.toml примерно выглядело:
[project]
name = "etl-helpers"
version = "1.2.0"
authors = [{name = "Your Name", email = "name@example.com"}]
description = "Internal ETL utilities for data pipelines."
readme = "README.md"
requires-python = ">=3.9"
dependencies = [
"pandas>=1.5",
"sqlalchemy>=2.0",
"psycopg2-binary",
"requests",
]
[project.optional-dependencies]
dev = ["pytest", "black", "mypy", "pytest-httpserver"]
[build-system]
requires = ["setuptools>=61.0", "wheel"]
build-backend = "setuptools.build_meta"
А вот кусок кода из модуля, который за инкрементальную загрузку отвечал:
# src/etl_helpers/incremental.py
from datetime import datetime
from typing import Any, Dict
import hashlib
class IncrementalLoader:
"""Класс для управления состоянием инкрементальной загрузки."""
def __init__(self, state_storage: Dict[str, Any]):
self.state = state_storage
def get_last_processed_id(self, table_name: str) -> int:
"""Возвращает последний обработанный ID для таблицы."""
return self.state.get(f"last_id_{table_name}", 0)
def update_state(self, table_name: str, last_id: int, checksum: str = None):
"""Обновляет состояние после успешной загрузки."""
self.state[f"last_id_{table_name}"] = last_id
self.state[f"updated_at_{table_name}"] = datetime.utcnow().isoformat()
if checksum:
self.state[f"checksum_{table_name}"] = checksum
В общем, пакет получился — огонь. Перестали все в десяти местах один и тот же велосипед пилить, и новых ребят подключать стало в разы проще. Не то чтобы волнение ебать, но приятно, когда твой костыль всю команду на ноги ставит.