ETL

ETL-проект для начинающих Data Engineers: От почтового сервера до Greenplum

Дмитрий Кузьмин
#ETL#blog
Feature image

Привет! Меня зовут Дмитрий Кузьмин, я Data Engineer, веду свой

Блог в Telegram

по data engineering. Пишу про область, рабочие кейсы, немного лайфстайла. Будет интересно больше новичкам в области, но также поддержу разговор с middle+ инженерами. Буду рад, если в моем канале найдете для себя полезности.

Заходите в мой репозиторий - здесь вы найдете материалы и полезные статьи.

Также приглашаю вас на свой авторский курс по SQL - первый курс в серии “Инженерия данных”. Обилие задач, теория в конспектах и видео, отдельный закрытый телеграм канал для участников, оперативная помощь и консультация по прохождению.

В этой статье я хочу поделиться своим пет-проектом, который посвящен созданию ETL-процесса — это один из ключевых компонентов в работе любого Data Engineer. В моем случае проект направлен на извлечение данных из электронной почты, их преобразование и последующую загрузку в базу данных Greenplum для дальнейшего анализа и визуализации.

Идея создания этого проекта возникла у меня из реальной необходимости: я хотел лучше контролировать свои расходы в крупных продуктовых сетях, особенно в таких магазинах, как “ВкусВилл”. Основная задача заключалась в том, чтобы систематизировать свои покупки по категориям товаров, анализировать эти данные и видеть динамику расходов. Конечно, существуют приложения, которые уже умеют строить подобные графики, и даже сам “ВкусВилл” предоставляет такую возможность в своем личном кабинете. Но моя цель была гораздо глубже — я хотел создать собственную систему, где данные из разных магазинов могли бы агрегироваться в одном месте. Это позволило бы не только анализировать информацию о расходах, но и использовать её для более сложных вычислений и визуализаций.

Кроме того, я всегда хотел получить все эти данные в удобном и кастомизированном формате, который был бы легко настраиваемым под свои нужды. Готовые приложения часто не дают возможности гибко изменять категории товаров или детализировать данные до нужного уровня. Поэтому в рамках этого проекта я решил использовать такие инструменты, как Python для автоматизации процессов извлечения и обработки данных, и Greenplum в качестве базы данных для их хранения и последующего анализа.

В этой статье я расскажу, как, используя Python и базу данных Greenplum, мне удалось автоматизировать процесс извлечения данных из писем от магазина “ВкусВилл”, структурировать их и загрузить в базу для дальнейшей обработки. На этапе трансформации данных я разобрал, как можно извлечь ключевую информацию из писем (например, названия товаров, их количество, цену и общую стоимость заказа), а затем преобразовать эти данные в формат, удобный для дальнейшей аналитики.

Проект охватывает весь жизненный цикл данных, начиная с их извлечения из внешних источников (электронная почта), обработки и преобразования в удобный формат, и заканчивая их загрузкой в мощную аналитическую базу данных, где можно запускать сложные запросы и визуализировать результаты.

Итак, начнем.


Введение

Используемые инструменты:

Подготовительные шаги:


EXTRACT

Здесь происходит извлечение данных из почтового ящика. Подключаемся к почтовому серверу, извлекаем непрочитанные письма и сохраняем их для дальнейшей обработки.

1. Подключение к почтовому серверу

import imaplib
import email
from email.header import decode_header
import base64
import mail_config

# Загрузка данных для подключения из конфигурационного файла
mail_pass = mail_config.mail_pass  # Пароль почтового ящика
username = mail_config.username  # Имя пользователя (логин)
imap_server = "imap.mail.ru"  # Адрес почтового сервера

# Подключение к серверу через SSL (защищенное соединение)
imap = imaplib.IMAP4_SSL(imap_server)

# Логинимся на почтовый сервер с помощью логина и пароля
imap.login(username, mail_pass)

# Выбираем папку с входящими письмами для работы
imap.select("INBOX")

2. Извлечение непрочитанных писем

# Поиск всех непрочитанных писем в почтовом ящике
unseen_mails = imap.search(None, 'UNSEEN')  

# Преобразуем результат поиска в строку для дальнейшей обработки
unseen_mails_str = str(unseen_mails[1])

# Выводим список ID непрочитанных писем
print('Непрочитанные письма: ', parsing_list_unseen_email(unseen_mails_str))

Пара комментариев по блоку выше:

3. Парсинг непрочитанных писем

def parsing_list_unseen_email(unseen_emails: str):
    """
    Функция для получения списка ID непрочитанных писем.
    :param unseen_emails: Строка с ID непрочитанных писем.
    :return: Список ID писем.
    """
    lst_id_unseen_emails = []  # Пустой список для ID
    digit_char = ''  # Переменная для временного хранения цифр
    for char in unseen_emails:
        # Если текущий символ — цифра, добавляем его к временной строке
        if char.isdigit():
            digit_char += char
        else:
            # Если цифры закончились, сохраняем их как ID и обнуляем временную строку
            if digit_char:
                lst_id_unseen_emails.append(digit_char)
                digit_char = ''
    return lst_id_unseen_emails  # Возвращаем итоговый список ID

Снова комментарии:

4. Извлечение конкретного письма

# Извлекаем одно из писем по его ID
res, msg = imap.fetch(b'3123', '(RFC822)')  # Здесь 3123 — это ID письма
msg = email.message_from_bytes(msg[0][1])  # Преобразуем байты в объект сообщения

# Читаем и выводим заголовок письма (например, тему)
print('\nЗаголовок письма:\n', decode_header(msg["Subject"])[0][0].decode())

5. Извлечение тела письма

def extract_multipart(msg):
    """
    Функция для извлечения содержимого письма с возможностью обработки вложенных сообщений.
    :param msg: Объект сообщения.
    """
    with open('D:/Mail_read_files/email_body.txt', 'w', encoding='utf-8') as f:
        # Если письмо многокомпонентное (содержит вложенные элементы)
        if msg.is_multipart():
            for part in msg.walk():  # Проходим по всем частям письма
                # Если часть письма — это текст (HTML-формат)
                if part.get_content_type() == 'text/html':
                    # Раскодируем содержимое и записываем его в файл
                    f.write(base64.b64decode(part.get_payload()).decode())
        else:
            # Если письмо не содержит вложенных частей, просто записываем его содержимое
            f.write(base64.b64decode(msg.get_payload()).decode())
    print('[INFO] Файл email_body.txt создан')

TRANSFORM

После того как мы извлекли содержимое письма, переходим к его анализу. В проекте в этом примере я рассматриваю письма от магазина “Вкусвилл”, содержащеи информацию о покупках.

1. Парсинг HTML-содержимого

import pandas as pd

# Инициализация пустых списков для хранения данных
st = {'product_name': [], 'count': [], 'price': [], 'total_price': [], 'order_date': [], 'shop_name': []}

def transform_data_vkusvill():
    """
    Парсинг письма от магазина "Вкусвилл" для извлечения данных о покупках.
    """
    with open(r'D:/Mail_read_files/email_body.txt', 'r', encoding='utf-8') as f:
        lines = f.readlines()  # Чтение всех строк файла
        for i, line in enumerate(lines):
            # Ищем строку с названием магазина
            if 'АО "Вкусвилл"' in line:
                shop_name = 'АО "Вкусвилл"'  # Сохраняем название магазина
                # Дата заказа находится на 14-й строке после найденного названия
                order_date = lines[i + 14].split('<')[0].strip()
            # Ищем строку с информацией о товаре
            if 'width="40%"' in line:
                st['order_date'].append(order_date)  # Сохраняем дату заказа
                st['shop_name'].append(shop_name)  # Сохраняем название магазина
                
                # Парсинг названия товара
                product_line = lines[i + 2]
                if ',кг' in product_line or ',шт' in product_line:
                    product_name = product_line.split(',')[0].strip()  # Название товара
                    st['product_name'].append(product_name)
                
                # Парсинг цены, количества и общей суммы
                st['price'].append(float(lines[i + 4].replace(',', '.')))
                st['count'].append(float(lines[i + 6].replace(',', '.')))
                total_price = float(lines[i + 10].split('<')[0].replace(',', '.'))
                st['total_price'].append(total_price)

transform_data_vkusvill()  # Вызов функции для парсинга

Пара слов про блок:

2. Преобразование в DataFrame и сохранение в CSV

# Преобразование данных в DataFrame
df = pd.DataFrame(st)

# Сохранение DataFrame в CSV для дальнейшей загрузки
df.to_csv('D:/Mail_read_files/email_body.csv', sep=';', encoding='utf-8', index=False)

LOAD

На финальном этапе я загружаем данные из CSV - файла в временную таблицу базы данных Greenplum.

1. Загрузка данных в базу данных

import psycopg2
import mail_config

def load_data_to_temp_table():
    """
    Загрузка данных из CSV в временную таблицу Greenplum.
    """
    answer = input('Точно загрузить новую пачку данных? Напишите "да" или "нет": ')
    
    # Проверка на подтверждение действия пользователя
    if answer.lower() == 'да':
        # Подключаемся к базе данных Greenplum
        with psycopg2.connect(
            database=mail_config.db_name,
            user=mail_config.user,
            password=mail_config.password,
            host=mail_config.host,
            port=mail_config.port
        ) as conn:
            # Открываем курсор для выполнения SQL-запросов
            with conn.cursor() as cur, open('D:/Mail_read_files/email_body.csv', 'r', encoding='utf-8') as file:
                # Загружаем данные из CSV в таблицу базы данных
                cur.copy_from(file, 'email_body_temp', sep=';')
                conn.commit()  # Подтверждаем транзакцию
                print("Данные успешно загружены")
    else:
        print("Загрузка отменена")

load_data_to_temp_table()  # Вызов функции для загрузки данных

Это был мой первый пет-проект, в котором мне пришлось сразу использовать несколько ключевых инструментов, необходимых для работы Data Engineer. Я понимаю, что статья может показаться сложной, но не стоит пугаться. Попробуйте разобрать код построчно и проанализировать каждый шаг — это отличный способ лучше понять, как все работает. Обычно после такого подхода многие моменты становятся более ясными и понятными.

Я постарался снабдить код подробными комментариями, чтобы помочь вам лучше понять каждый шаг процесса. Надеюсь, это поможет вам в освоении подобных задач.

Этот проект — только первый шаг, базовый этап извлечения данных из почты. В следующей статье я расскажу, как автоматизировать процесс, поставив его на расписание, а также покажу, как подключить BI-инструменты для создания визуализаций, чтобы вы могли видеть свои данные в удобной и красивой форме.

← Back to Blog