Оптимизация стратегий на 100+ тикерах: Multiprocessing в Python

Оптимизация стратегий на 100+ тикерах: Multiprocessing в Python Анализ данных и Бэктесты
Узнайте, как эффективно оптимизировать параметры торговых стратегий для сотен тикеров одновременно, используя многопроцессорность в Python. Подробное руководство с примером кода.
Суть: Оптимизация параметров торговых стратегий для множества активов одновременно достигается за счет распараллеливания вычислений с использованием модуля multiprocessing в Python. Это позволяет значительно сократить время бэктестинга и поиска оптимальных параметров для 100+ тикеров, распределяя задачи по доступным ядрам процессора.

Исходный код

Представленный ниже код демонстрирует полный процесс оптимизации параметров простой стратегии скользящих средних (Moving Average Crossover) для более чем 100 фиктивных тикеров. Мы используем модуль multiprocessing для параллельного выполнения задач, что значительно ускоряет процесс. Для наглядности и отслеживания прогресса используется библиотека tqdm.

Для работы с этим кодом вам потребуются следующие библиотеки: pandas, numpy, itertools, multiprocessing и tqdm. Установите их, если они еще не установлены: pip install pandas numpy tqdm.


import pandas as pd
import numpy as np
import itertools
import multiprocessing
import time
from tqdm import tqdm # Для отображения прогресса

# 1. Функция для генерации фиктивных данных
def generate_dummy_data(ticker, start_date, end_date):
    """
    Генерирует фиктивные OHLCV данные для заданного тикера.
    """
    dates = pd.date_range(start=start_date, end=end_date, freq='D')
    n_days = len(dates)
    
    # Симулируем случайное блуждание для цены закрытия
    np.random.seed(hash(ticker) % (2**32 - 1)) # Устанавливаем seed для воспроизводимости по тикеру
    prices = 100 + np.cumsum(np.random.randn(n_days))
    
    df = pd.DataFrame({
        'Open': prices,
        'High': prices + np.random.rand(n_days) * 2,
        'Low': prices - np.random.rand(n_days) * 2,
        'Close': prices + np.random.randn(n_days) * 0.5,
        'Volume': np.random.randint(10000, 100000, n_days)
    }, index=dates)
    
    df['Close'] = df['Close'].apply(lambda x: max(x, 1.0)) # Гарантируем положительные цены
    return df

# 2. Функция для бэктестинга стратегии (MA Crossover)
def backtest_strategy(df, short_window, long_window):
    """
    Бэктестит простую стратегию MA Crossover и возвращает Sharpe Ratio.
    """
    if short_window >= long_window or short_window <= 0 or long_window <= 0:
        return -np.inf # Невалидные параметры

    df['SMA_Short'] = df['Close'].rolling(window=short_window, min_periods=1).mean()
    df['SMA_Long'] = df['Close'].rolling(window=long_window, min_periods=1).mean()

    # Генерируем сигналы
    df['Signal'] = 0
    # Сигнал на покупку, когда короткая MA пересекает длинную MA снизу вверх
    df.loc[df['SMA_Short'] > df['SMA_Long'], 'Signal'] = 1 
    # Сигнал на продажу, когда короткая MA пересекает длинную MA сверху вниз
    df.loc[df['SMA_Short'] < df['SMA_Long'], 'Signal'] = -1 

    # Рассчитываем ежедневную доходность стратегии
    # Сдвигаем сигнал на 1 день, чтобы торговать на следующий день после его появления
    df['Strategy_Returns'] = df['Close'].pct_change() * df['Signal'].shift(1)

    # Рассчитываем Sharpe Ratio (годовой)
    # Предполагаем ежедневные данные, 252 торговых дня в году
    annualized_returns = df['Strategy_Returns'].mean() * 252
    annualized_std = df['Strategy_Returns'].std() * np.sqrt(252)

    if annualized_std == 0 or np.isnan(annualized_std):
        return -np.inf # Избегаем деления на ноль, нет волатильности = нет осмысленной стратегии

    sharpe_ratio = annualized_returns / annualized_std
    return sharpe_ratio

# 3. Функция для оптимизации параметров для одного тикера
def optimize_ticker_params(args):
    """
    Выполняет поиск оптимальных параметров для одного тикера по заданной сетке.
    Возвращает имя тикера, лучшие параметры и соответствующий Sharpe Ratio.
    """
    ticker_name, df, param_grid = args
    
    best_sharpe = -np.inf
    best_params = None
    
    for short_w, long_w in param_grid:
        sharpe = backtest_strategy(df.copy(), short_w, long_w)
        if sharpe > best_sharpe:
            best_sharpe = sharpe
            best_params = (short_w, long_w)
            
    return ticker_name, best_params, best_sharpe

if __name__ == '__main__':
    # Параметры для генерации данных
    num_tickers = 100 # Количество тикеров для демонстрации
    tickers = [f'TICKER_{i:03d}' for i in range(num_tickers)]
    start_date = '2010-01-01'
    end_date = '2023-12-31'

    print(f"Генерация фиктивных данных для {num_tickers} тикеров...")
    all_ticker_data = {}
    for ticker in tqdm(tickers, desc="Генерация данных"):
        all_ticker_data[ticker] = generate_dummy_data(ticker, start_date, end_date)
    print("Данные сгенерированы.")

    # Диапазоны параметров для оптимизации
    # Пример: короткие MA от 5 до 20 с шагом 5, длинные MA от 20 до 60 с шагом 10
    short_windows = range(5, 21, 5)  # [5, 10, 15, 20]
    long_windows = range(20, 61, 10) # [20, 30, 40, 50, 60]
    param_grid = list(itertools.product(short_windows, long_windows))

    print(f"Начинаем оптимизацию параметров для {num_tickers} тикеров с {len(param_grid)} комбинациями на каждый...")

    # Подготовка аргументов для multiprocessing
    # Каждый элемент списка будет кортежем (ticker_name, df, param_grid)
    tasks = [(ticker, all_ticker_data[ticker], param_grid) for ticker in tickers]

    # Использование multiprocessing
    # Определяем количество процессов. Обычно используют os.cpu_count() или os.cpu_count() - 1
    num_processes = multiprocessing.cpu_count() - 1 if multiprocessing.cpu_count() > 1 else 1
    print(f"Используем {num_processes} процессов для параллельной оптимизации.")

    start_time = time.time()
    
    results = []
    with multiprocessing.Pool(processes=num_processes) as pool:
        # Используем tqdm для отображения прогресса выполнения задач в пуле
        for result in tqdm(pool.imap_unordered(optimize_ticker_params, tasks), total=len(tasks), desc="Оптимизация тикеров"):
            results.append(result)

    end_time = time.time()

    print(f"\nОптимизация завершена за {end_time - start_time:.2f} секунд.")

    # Вывод лучших параметров для каждого тикера
    print("\nЛучшие параметры и Sharpe Ratio для каждого тикера:")
    for ticker, params, sharpe in sorted(results, key=lambda x: x[0]):
        if params:
            print(f"  {ticker}: Short={params[0]}, Long={params[1]}, Sharpe Ratio={sharpe:.4f}")
        else:
            print(f"  {ticker}: Не удалось найти оптимальные параметры (Sharpe Ratio: {sharpe:.4f})")

    # Пример агрегированного результата (например, средний Sharpe Ratio)
    valid_sharpes = [r[2] for r in results if r[2] != -np.inf]
    if valid_sharpes:
        print(f"\nСредний Sharpe Ratio по всем оптимизированным тикерам: {np.mean(valid_sharpes):.4f}")
    else:
        print("\nНе удалось найти ни одного валидного Sharpe Ratio.")

Разбор параметров

  • generate_dummy_data(ticker, start_date, end_date): Функция для создания синтетических исторических данных. В реальных условиях вы будете загружать данные из базы данных или файлов.
  • backtest_strategy(df, short_window, long_window): Основная функция, которая принимает DataFrame с данными тикера и параметры стратегии (длины скользящих средних). Она рассчитывает сигналы, доходность и возвращает метрику эффективности, в данном случае — годовой Sharpe Ratio. Подобные подходы к бэктестингу могут быть расширены для более сложных моделей, как описано в статье Бэктестинг стратегии на скрытых марковских моделях (HMM) в Python.
  • optimize_ticker_params(args): Эта функция является основной рабочей единицей для каждого процесса. Она принимает кортеж (ticker_name, df, param_grid), итерирует по всем комбинациям параметров в param_grid, вызывает backtest_strategy для каждой комбинации и находит лучшие параметры для данного тикера.
  • num_tickers: Количество тикеров, для которых будет производиться оптимизация. Увеличьте это значение, чтобы увидеть преимущества многопроцессорности.
  • short_windows и long_windows: Диапазоны значений для коротких и длинных скользящих средних. itertools.product используется для создания всех возможных комбинаций из этих диапазонов, формируя param_grid.
  • multiprocessing.Pool(processes=num_processes): Создает пул рабочих процессов. num_processes обычно устанавливается равным количеству ядер процессора для максимальной эффективности.
  • pool.imap_unordered(optimize_ticker_params, tasks): Распределяет задачи по оптимизации между процессами в пуле. imap_unordered позволяет получать результаты по мере их готовности, а не в порядке отправки задач, что часто быстрее.
  • tqdm: Библиотека для отображения прогресс-баров в консоли, что очень удобно при выполнении длительных операций.

Как запустить

Для запуска скрипта выполните следующие шаги:

  1. Установите необходимые библиотеки: Откройте терминал или командную строку и выполните команду:
    
    pip install pandas numpy tqdm
    
  2. Сохраните код: Скопируйте весь предоставленный Python-код и сохраните его в файл с расширением .py, например, multi_ticker_optimization.py.
  3. Запустите скрипт: В терминале перейдите в директорию, где вы сохранили файл, и выполните команду:

    
    python multi_ticker_optimization.py
    
  4. Анализируйте результаты: Скрипт выведет в консоль информацию о ходе генерации данных, процессе оптимизации (с прогресс-баром) и, наконец, список лучших параметров и Sharpe Ratio для каждого тикера, а также средний Sharpe Ratio по всем оптимизированным активам.

Этот подход может быть адаптирован для более сложных стратегий и метрик. Например, для анализа взаимосвязей между активами перед оптимизацией можно использовать методы, описанные в статье Скользящая корреляция BTC и S&P 500 на Python в Pandas, или для группировки активов по схожим характеристикам, как в Кластеризация криптовалют по волатильности на Python с K-Means, что может помочь в более целенаправленной оптимизации.

Оцените статью
FinFluct