바이낸스 Historical Market Data와 REST API 캔들 데이터 합치기

대부분 검색을 통해서 바이낸스 Candle 데이터를 REST API를 통해서 가져올 수 있다는 것을 알고 있을 것이다. 그런데 왜 지금까지 바이낸스 Historical Market Data 에서 zip 파일형태로 Candle 데이터를 다운 받아 1개의 CSV파일로 합치는 작업을 했던 것일까?

바이낸스 REST API로 선물(Futures)이나 현물(Spot) 캔들 데이터를 가져오는 경우 바이낸스 WAF(웹 방화벽)에 의해 IP단위로 API 호출 횟수 제한을 받게 된다. 현재 바이낸스 선물에 적용된 WAF 정책은 1분당 2,400의 가중치(Weight)를 허용하는데 API로 1000개씩의 캔들을 가져온다면 1분에 약 240번 정도 가져오는 것이 가능한 것으로 계산하면 된다. (1번 호출 = 가중치 10 소모)

# https://binance-docs.github.io/apidocs/futures/en/#exchange-information
# 바이낸스의 API가중치는 갈수록 늘어나는게 아니라 줄어들고 있다. 18년도쯤엔 6,000이었던거 같은데...
# GET /fapi/v1/exchangeInfo 에서 REQUEST_WEIGHT 값으로 2,400을 확인할 수 있다.
{
    "exchangeFilters": [],
    "rateLimits": [
        {
            "interval": "MINUTE",
            "intervalNum": 1,
            "limit": 2400,
            "rateLimitType": "REQUEST_WEIGHT" 
        },
        {
            "interval": "MINUTE",
            "intervalNum": 1,
            "limit": 1200,
            "rateLimitType": "ORDERS"
        }
    ],
    ... (생략)
}

 

문제는, 1개의 코인에 대한 캔들을 1분동안 240,000개 가져오고 나면 API호출이 차단되어 5분간 HTTP 4xx에러를 보게 된다는 것이다. (참고링크)

따라서 API가 차단되지 않는 수준에서 2020년 1월 1일 이후의 바이낸스 비트코인 선물데이터를 (약 2,025,000개) 모두 가져오려면 1분에 200,000개 정도씩을 가져올 수 있기 때문에 10분 넘는 시간이 필요해진다. (앞서 작업했던 다운로드 방식을 찾았던 이유가 이것 때문이었다.) 

 

여기에 더해 관심 있는 몇개 코인의 과거 1분지표 캔들 데이터까지 모두 가져오려면....^^ (1분 1초가 아까운 이시기에 기다릴 수 있을까?)

 

이제 어떤 이유로 파일로 Historical Market Data를 다운받아 최소의 REST API 호출로 캔들 데이터를 가져오려고 했는지  이해가 될 것이다.

REQUEST_WEIGHT
Binance REQUEST_WEIGHT

이제 앞서 포스팅했던 파이썬 스크립트에 기능을 몇개만 추가하면 2,400 REQUEST_WEIGHT API 제한에 걸리지 않고 원하는 만큼의 캔들 데이터를 얻을 수 있다. 1분도 걸리지 않아 [ 2백만개 x Symbol 종류 ] 만큼의 Candle 데이터를 얻을 수 있다!)

 

바이낸스 비트코인 선물 Kline/Candle 파일로 다운받아 합치기

 

바이낸스 비트코인 선물 Kline/Candle 파일로 다운받아 합치기

Python으로 바이낸스 3년간의 비트코인 선물 1분지표 Candle 데이터를 파일로 빠르게 다운받아 머신러닝에 필요한 데이터를 얻어 낼 수 있게 되었다. 하지만 zip 파일로 다운로드만 빠르게 되었을뿐,

axgo.tistory.com

 

1.  바이낸스 선물 REST API를 최소로 호출하자

REST API로 바이낸스 선물 Candle 데이터를 가져 오기 위한 기능을 먼저 만들어 보자. 바이낸스의 Public REST API를 사용해 가져오면 되기 때문에 api-key등의 인증 정보가 없어도 된다. (로그인을 하진 않지만 IP를 추적당하게 된다...)

 

바이낸스 선물(Futures) Candle 데이터를 REST API로 가져오기 위한 Endpoint url은 fapi.binance.com/fapi/v1/klines 이다.

현물(Spot) Candle 데이터를 가져올때 사용하는 REST API는 Endpoint url은  api.binance.com/api/v3/klines 을 사용하면 된다. (바이낸스는 요소 요소 꼼꼼하게 API가 잘 되어 있다)

 

1-1. REST API 요청 파라메터와 Candle 데이터 응답 값 

Candle 데이터를 가져올때 필요한 Param(변수) 를 먼저 확인하자.

URL endpoint: GET /fapi/v1/klines

Name Type Mandatory Description
symbol STRING YES eg. BTCUSDT, ETHUSDT, PEPE1000USDT 등등이다.
interval ENUM YES eg. 1m, 5m, 1h -> 1분주기, 5분주기 등 지표의 인터벌이다.
startTime LONG NO 13자리의 시작시간을 전달한다. Python의 timestamp에 1000을 곱해서 만들면 된다.
endTime LONG NO 13자리의 종료시간을 전달한다. Python의 timestamp에 1000을 곱해서 만들면 된다.
limit INT NO Default 500; max 1500. 몇개의 Candle을 요청하는 것인지 알려줄때 사용하는 값이다.

 

Python에서 requests 를 이용해 Parameter에 해당하는 값들을 바이낸스에 REST API로 요청하면 JSON 형태로 아래와 같은 응답을 받을 수 있다.

[
  [
    1499040000000,      // Open time
    "0.01634790",       // Open
    "0.80000000",       // High
    "0.01575800",       // Low
    "0.01577100",       // Close
    "148976.11427815",  // Volume
    1499644799999,      // Close time
    "2434.19055334",    // Quote asset volume
    308,                // Number of trades
    "1756.87402397",    // Taker buy base asset volume
    "28.46694368",      // Taker buy quote asset volume
    "17928899.62484339" // Ignore.
  ]
]

캔들의 OHLCV에 해당하는 정보와 시작시간, 종료시간 등등 뭐가 좀 많지만 나에게 필요한 컬럼은 Open Time, Open, HIgh, Low, Close, Low, Volume 정보만 있으면 된다.

 

1-2. 파이썬 Requests로 REST API 요청을 처리할 기능을 추가하자

limit설정을 1500으로 캔들을 1500개씩 처리해 받을수도 있지만 계산의 편의를 위해 1000개를 처리하는 것으로 기본값을 잡는다.

응답값의 opentime 에 들어 있는 timestamp는 타임존 정보가 없지만 UTC+0 시간이다.

Pandas 데이터프레임으로 넣을때 utc정보를 설정하도록해야 타임존을 나의 시간에 맞추어 사용할 수 있다. 

def binance_klines(market_type="um", symbol="BTCUSDT", interval="1m", start_time=None, end_time=None, limit=1000):
    if market_type == ["um", "cm"]:
      url = "https://fapi.binance.com/fapi/v1/klines"
    else:
      url = "https://api.binance.com/api/v3/klines"
    params = {
        "symbol": symbol,
        "interval": interval,
        "startTime": start_time,
        "endTime": end_time,
        "limit": limit
    }
    res = requests.get(url, params=params)
    value = res.json()
    df = pd.DataFrame(value)
    df = df.iloc[:, :6]
    df.columns = ['datetime', 'open','high', 'low', 'close', 'volume']
    df.index = pd.to_datetime(df['datetime'], unit='ms', utc=True)
    df = df.astype(float)
    df = df.tz_convert('Asia/Seoul')
    return df

 

실행을 하면 이렇게 UTC+9 시간을 인덱스로 사용하는 판다스 데이터프레임을 만들어 리턴해준다.

>>> binance_klines()
                               datetime     open     high      low    close   volume
datetime
2023-11-07 09:23:00+09:00  1.699317e+12  35030.8  35030.9  35022.5  35022.6   44.412
2023-11-07 09:24:00+09:00  1.699317e+12  35022.6  35025.0  34993.2  34993.2   87.696
2023-11-07 09:25:00+09:00  1.699317e+12  34993.3  34993.3  34978.0  34981.8  134.993
2023-11-07 09:26:00+09:00  1.699317e+12  34981.9  34981.9  34960.9  34964.1  187.646
2023-11-07 09:27:00+09:00  1.699317e+12  34964.2  34981.0  34959.2  34959.2  134.615
...                                 ...      ...      ...      ...      ...      ...
2023-11-08 01:58:00+09:00  1.699376e+12  34699.2  34699.3  34688.2  34691.9   46.638
2023-11-08 01:59:00+09:00  1.699376e+12  34692.0  34692.0  34676.0  34682.8   48.127
2023-11-08 02:00:00+09:00  1.699376e+12  34682.8  34682.9  34662.1  34671.2  121.867
2023-11-08 02:01:00+09:00  1.699376e+12  34671.2  34671.2  34645.5  34645.5  115.375
2023-11-08 02:02:00+09:00  1.699377e+12  34645.6  34650.0  34627.3  34628.0  146.780

[1000 rows x 6 columns]

 

1-2. 시작시간을 주면 Now()까지의 Candle 데이터를 가져오는 기능을 만들자

이제 시작시간(Timestamp)을 주면 현재시간(Now)까지의 Candle 데이터를 REST API로 받아오는 기능을 추가해보자.

앞선 포스팅에서 klines_history 의 기능을 만들때 마지막 데이터의 Timestmap을 반환하도록 했었다. 그 Timestamp에 1분을 추가해 인수를 전달하면 그 시간부터 가장 최근시간까지의 Candle 데이터를 판다스 데이터프레임으로 리턴하도록 하면 된다.

def klines_recent(market_type="um", symbol="BTCUSDT", interval="1m", timestamp=None):
  now = datetime.now()
  recent = pd.DataFrame()
  if not timestamp:
    day_ago = now - timedelta(days=2)
    timestamp = int(day_ago.timestamp() * 1000)
  while timestamp < now.timestamp() * 1000:
    df = binance_klines(market_type, symbol, interval, start_time=timestamp, limit=1000)
    recent = pd.concat([recent, df])
    timestamp += 60000 * 1000
  recent_file_path = join(STORE_PATH, f'{market_type}_{symbol}_{interval}_recent.csv')
  print(f"### Save Recents Klines/Candles (REST API) {format(len(recent), ',')} :", recent_file_path)
  recent.to_csv(recent_file_path, index=False)
  return recent, recent_file_path, recent['datetime'].iloc[-1]

 

실행하면 전달한 시간에서 현재시간까지 binance_klines를 필요한 만큼 반복해 Candle 데이터를 받아온다.

받아온 캔들 데이터의 데이터프레임과 binance_klines 처럼 지정된 STORE_PATH 경로에 1m_recent.csv파일을 저장한 경로, 마지막 캔들의 Timestamp까지 3개의 정보를 리턴하도록 했다.

>>> klines_recent()
### Save Recents Klines/Candles (REST API) 2,880 : /Users/name/binance_data/1m_recent.csv
                               datetime     open     high      low    close   volume
datetime
2023-11-06 02:21:00+09:00  1.699205e+12  35102.8  35186.8  35102.8  35170.0  434.368
2023-11-06 02:22:00+09:00  1.699205e+12  35170.0  35170.0  35134.0  35134.1  315.317
2023-11-06 02:23:00+09:00  1.699205e+12  35134.0  35144.8  35101.1  35103.7  315.427
2023-11-06 02:24:00+09:00  1.699205e+12  35103.6  35118.9  35085.4  35100.0  259.511
2023-11-06 02:25:00+09:00  1.699205e+12  35100.0  35135.3  35100.0  35133.4  190.859
...                                 ...      ...      ...      ...      ...      ...
2023-11-08 02:16:00+09:00  1.699377e+12  34733.0  34734.3  34715.2  34734.2  196.926
2023-11-08 02:17:00+09:00  1.699377e+12  34734.3  34772.5  34734.2  34771.3  585.448
2023-11-08 02:18:00+09:00  1.699377e+12  34771.4  34798.4  34750.0  34752.0  820.787
2023-11-08 02:19:00+09:00  1.699378e+12  34752.4  34782.3  34750.7  34773.2  204.823
2023-11-08 02:20:00+09:00  1.699378e+12  34773.2  34790.0  34773.2  34781.9  153.102

[2880 rows x 6 columns], '/Users/name/binance_data/1m_recent.csv', 1699377600000.0)

 

2. 과거 캔들 데이터와 REST API로 받은 데이터를 합치자. (Merge Data)

History 데이터와 Recent 데이터의 형식은 동일하게 datetime, open, high, low, close, volume으로 6개의 컬럼으로 되어 있다. 이제 과거 캔들 데이터부터 현재까지의 캔들 데이터를 판다스에서 concat을 시켜주기만 하면 원하던 결과를 얻을 수 있다.

 

2-1.  klines_merge 기능을 만들자

지금까지 기능을 만든 순서대로 동작을 시키면 작업이 완료 된다. 동작할 순서를 보자.

  1. Zip Download: 바이낸스 Historycal Market Data에서 zip파일을 다운로드 한다.
  2. Unzip: 다운받은 zip파일을 압축 해제 한다.
  3. History: csv파일들을 하나의 단일 csv파일로 만들고 마지막 캔들의 timestmap를 반환한다.
  4. Recent: 마지막 캔들의 timestamp이후부터의 candle데이터를 REST API로 받는다.
  5. Merge: history 데이터와 recent 데이터를 하나의 데이터프레임으로 만들고 단일 csv로 저장한다.

klines_merge 기능을 만들어 순서대로 동작하도록 Python 기능을 추가 하자.

def klines_merge(market_type="um", symbol="BTCUSDT", interval="1m"):
  download_binance_datas(market_type, symbol, interval)
  klines_unzip(market_type, symbol, interval, search_directory=STORE_PATH)
  history_df, history_file_path, history_last_timestamp = klines_history(market_type, symbol, interval, STORE_PATH)
  recent_df, recent_file_path, recent_last_stamp = klines_recent(market_type, symbol, interval, timestamp=int(history_last_timestamp + 60000))
  df = pd.concat([history_df, recent_df])
  all_data_path = join(STORE_PATH, f'{market_type}_{symbol}_{interval}_merge.csv')
  df.to_csv(all_data_path, index=False)
  print(f"### Save Data Range {format(len(df), ',')} Candles : {df.index[0].strftime('%Y-%m-%d %H:%M')} ~ {df.index[-1].strftime('%Y-%m-%d %H:%M')}")
  print(f"### Save Complete : \"{market_type}\" \"{symbol}\" \"{interval}\" Klines/Candles [ {all_data_path} ]")
  return df, all_data_path

 

이 기능을 실행하면 바이낸스 Historycal Market Data에서 다운받은 데이터를 이용해 최소한의 REST API 호출로 비트코인 선물 1분지표 캔들 데이터를 판다스 데이터프레임형태로 반환하면서 1개의 CSV로 만들어 결과를 1분 안쪽으로 만들어 준다. (나의 썩어가는 노트북에서 실행하면 40초 정도가 걸린다)

 

하다보니 앞서 포스팅한 스크립트의 내용이 조금 수정되어 로그가 살짝 달라지긴 했지만...

최종 확인을 위해 실행하면 아래와 같이 캔들데이터를 1개의 csv로 합친 결과로 저장까지 잘 완료를 할 수 있었다.

>>> klines_merge()
Requirement already satisfied: pandas in /Users/kiseo/.pyenv/versions/test/lib/python3.11/site-packages (from -r /var/folders/q1/fvl09ysd21zf56b1d_mhk1cw0000gn/T/candle_download_7e52gmvm/python/requirements.txt (line 1)) (2.1.2)
Requirement already satisfied: numpy<2,>=1.23.2 in /Users/kiseo/.pyenv/versions/test/lib/python3.11/site-packages (from pandas->-r /var/folders/q1/fvl09ysd21zf56b1d_mhk1cw0000gn/T/candle_download_7e52gmvm/python/requirements.txt (line 1)) (1.26.1)
Requirement already satisfied: python-dateutil>=2.8.2 in /Users/kiseo/.pyenv/versions/test/lib/python3.11/site-packages (from pandas->-r /var/folders/q1/fvl09ysd21zf56b1d_mhk1cw0000gn/T/candle_download_7e52gmvm/python/requirements.txt (line 1)) (2.8.2)
... (생략)
DataFrame(Header exist): /Users/name/binance_data/data/futures/um/monthly/klines/BTCUSDT/1m/BTCUSDT-1m-2023-09.csv
DataFrame(Header exist): /Users/name/binance_data/data/futures/um/monthly/klines/BTCUSDT/1m/BTCUSDT-1m-2023-10.csv
### Save History Klines/Candles (Download) 2,024,640 : /Users/name/binance_data/um_BTCUSDT_1m_history.csv
### Save Recents Klines/Candles (REST API) 1,074 : /Users/name/binance_data/um_BTCUSDT_1m_recent.csv
### Save Data Range 2,025,714 Candles : 2020-01-01 09:00 ~ 2023-11-08 02:53
### Save Complete : "um" "BTCUSDT" "1m" Klines/Candles [ /Users/name/binance_data/um_BTCUSDT_1m_merge.csv ]
>>>

 

2-2. Python 스크립트 완성

완성된 스크립트는 아래와 같다. binance_merge_candle.py 이름으로 저장해서 "python3 binance_merge_candle.py" 로 실행하면 __main__ 부분에 kline_merge가 동작하면서 지금까지 작성한 기능들을 순서대로 실행하며 바이낸스 선물 1분지표를 모두 다운로드 받게 된다.

(<더보기>를 클릭하자, 길어서 줄여놨다)

더보기
# -*- coding: utf-8 -*-
#!/usr/bin/env python3
import sys
import tempfile
import subprocess
from datetime import datetime, timedelta
from zipfile import ZipFile
from os import environ, getenv, makedirs, getcwd, walk, remove
from os.path import basename, join, exists, expanduser as home

def pip_install(package):
  subprocess.check_call([sys.executable, "-m", "pip", "install", package])

def pip_install_requirements(requirements_dir):
  subprocess.check_call([sys.executable, "-m", "pip", "install", "-r", requirements_dir.rstrip(".txt")+".txt"])

## GitPython으로 git을 사용할 수 있도록 한다. 없다면 pip로 GitPython을 설치 한다.
try:
  from git import Repo
except:
  pip_install("GitPython")
  from git import Repo

## 캔들 데이터를 사용하기 위해서는 Pandas가 필요하다.
try:
  import pandas as pd
except:
  pip_install("pandas")
  import pandas as pd

## Pandas 짝꿍 Numpy가 필요하다.
try:
  import numpy as np
except:
  pip_install("numpy")
  import numpy as np

## Binance REST API로 다운로드된 데이터에서 부족한 부분만 가져올 수 있도록 requests를 사용하자.
try:
  import requests
except:
  pip_install("requests")
  import requests

## 바이낸스 퍼블릭 데이터 다운로드 소스코드를 Temp(임시폴더) 다운로드 받아서 사용하도록 한다.
repo_url = "https://github.com/binance/binance-public-data.git"
temp_path = tempfile.mkdtemp(prefix='candle_download_')

## git으로 소스코드를 임시폴더에 클론(다운로드) 시키고 위치를 저장해두자.
repo_path = Repo.clone_from(repo_url, temp_path)
WORK_PATH = repo_path.working_dir

## STORE_DIRECTORY 환경변수가 없으면 사용자폴더에 binance_data를 사용하도록 설정한다.
STORE_PATH = join(home('~'), "binance_data") if not "STORE_DIRECTORY" in environ.keys() else getenv("STORE_DIRECTORY")
environ["STORE_DIRECTORY"] = STORE_PATH

## 캔들 데이터를 다운로드 받는 download-kline.py를 실행한다.
def download_klines(cmd, args):
  subprocess.check_call(cmd + args)

## 저장할 위치가 없으면 만들어주고, download-kline.py에 다운로드 받을 코인 정보등을 입력한다.
def download_binance_datas(market_type="um", symbol="BTCUSDT", interval="1m"):
  # environ["STORE_DIRECTORY"] = "/Users/name/binance_data/"
  if not exists(STORE_PATH):
    makedirs(STORE_PATH)
  # Install requirements library
  pip_install_requirements(join(WORK_PATH, "python", "requirements.txt"))
  # configure download command, 
  kline_cmd = [sys.executable, join(WORK_PATH, "python", "download-kline.py")]
  if market_type == ["um", "cm"]:
    monthly_args = ["-t", market_type, "-s", symbol, "-i", interval, "-skip-daily", "1", "-startDate", "2020-01-01"]
  else:
    monthly_args = ["-t", market_type, "-s", symbol, "-i", interval, "-skip-daily", "1", "-startDate", "2017-08-01"]
  daily_args = ["-t", market_type, "-s", symbol, "-i", interval, "-skip-monthly", "1", "-startDate", f"{datetime.now().strftime('%Y-%m')}-01"]
  # excute download kline
  download_klines(kline_cmd, monthly_args)
  download_klines(kline_cmd, daily_args)

def klines_unzip(market_type="um", symbol="BTCUSDT", interval="1m", search_directory=STORE_PATH):
  search_directory = join(search_directory, 'data')
  for root, dirs, files in walk(search_directory):
    for file in files:
      if market_type in root and symbol in root and interval in root and file.lower().endswith('.zip'):
        zip_file_path = join(root, file)
        # 압축을 풀 디렉토리 선택 (zip 파일이 있는 폴더와 동일한 위치)
        extract_directory = root
        # 압축 파일 열기
        with ZipFile(zip_file_path, 'r') as zip_ref:
          # 압축 해제
          zip_ref.extractall(extract_directory)
        print(f'압축 해제: {zip_file_path} -> {extract_directory}')

def klines_history(market_type="um", symbol="BTCUSDT", interval="1m", search_directory=STORE_PATH):
  search_directory = join(search_directory, 'data')
  # 모든 CSV 파일을 저장할 데이터 프레임 초기화
  history = pd.DataFrame()
  for root, dirs, files in walk(search_directory):
    for file in sorted(files):
      if market_type in root and symbol in root and interval in root and file.lower().endswith('.csv') and file != f'{market_type}_{symbol}_{interval}_history.csv':
        csv_file_path = join(root, file)
        # 해더가 있는지 확인하고 넘어가야함. 첫 번째 라인 확인
        with open(csv_file_path, 'r') as file:
          first_line = file.readline()
          # 컬럼 헤더가 있는 경우
          if 'open_time' in first_line or 'Open' in first_line or 'open' in first_line:
            print(f'DataFrame(Header exist): {csv_file_path}')
            df = pd.read_csv(csv_file_path)  # header=0 (기본값)
          # 컬럼 헤더가 없는 경우
          else:
            print(f'DataFrame(Header empty): {csv_file_path}')
            df = pd.read_csv(csv_file_path, header=None)
            df.columns = ['open_time', 'open','high', 'low', 'close', 'volume', 'close_time', 'quote_volume', 'count', 'taker_buy_volume', 'taker_buy_quote_volume', 'ignore']
        df = df.iloc[:, :6]
        df.columns = ['datetime', 'open','high', 'low', 'close', 'volume']
        history = pd.concat([history, df])
  history.index = pd.to_datetime(history['datetime'], unit='ms', utc=True)
  history = history.astype(float)
  history = history.tz_convert('Asia/Seoul')
  # index를 numpy배열로 만들어 중복된 정보를 제거한다.
  history = history.iloc[np.unique(history.index.values, return_index=True)[1]]
  history_file_path = join(STORE_PATH, f'{market_type}_{symbol}_{interval}_history.csv')
  print(f"### Save History Klines/Candles (Download) {format(len(history), ',')} :", history_file_path)
  history.to_csv(history_file_path, index=False)
  return history, history_file_path, history['datetime'].iloc[-1]

def binance_klines(market_type="um", symbol="BTCUSDT", interval="1m", start_time=None, end_time=None, limit=1000):
    if market_type == "um":
      url = "https://fapi.binance.com/fapi/v1/klines"
    else:
      url = "https://api.binance.com/api/v3/klines"
    params = {
        "symbol": symbol,
        "interval": interval,
        "startTime": start_time,
        "endTime": end_time,
        "limit": limit
    }
    res = requests.get(url, params=params)
    value = res.json()
    df = pd.DataFrame(value)
    df = df.iloc[:, :6]
    df.columns = ['datetime', 'open','high', 'low', 'close', 'volume']
    df.index = pd.to_datetime(df['datetime'], unit='ms', utc=True)
    df = df.astype(float)
    df = df.tz_convert('Asia/Seoul')
    return df

def klines_recent(market_type="um", symbol="BTCUSDT", interval="1m", timestamp=None):
  now = datetime.now()
  recent = pd.DataFrame()
  if not timestamp:
    day_ago = now - timedelta(days=2)
    timestamp = int(day_ago.timestamp() * 1000)
  while timestamp < now.timestamp() * 1000:
    df = binance_klines(market_type, symbol, interval, start_time=timestamp, limit=1000)
    recent = pd.concat([recent, df])
    timestamp += 60000 * 1000
  recent_file_path = join(STORE_PATH, f'{market_type}_{symbol}_{interval}_recent.csv')
  print(f"### Save Recents Klines/Candles (REST API) {format(len(recent), ',')} :", recent_file_path)
  recent.to_csv(recent_file_path, index=False)
  return recent, recent_file_path, recent['datetime'].iloc[-1]

def klines_merge(market_type="um", symbol="BTCUSDT", interval="1m"):
  download_binance_datas(market_type, symbol, interval)
  klines_unzip(market_type, symbol, interval, search_directory=STORE_PATH)
  history_df, history_file_path, history_last_timestamp = klines_history(market_type, symbol, interval, STORE_PATH)
  recent_df, recent_file_path, recent_last_stamp = klines_recent(market_type, symbol, interval, timestamp=int(history_last_timestamp + 60000))
  df = pd.concat([history_df, recent_df])
  all_data_path = join(STORE_PATH, f'{market_type}_{symbol}_{interval}_merge.csv')
  df.to_csv(all_data_path, index=False)
  print(f"### Save Data Range {format(len(df), ',')} Candles : {df.index[0].strftime('%Y-%m-%d %H:%M')} ~ {df.index[-1].strftime('%Y-%m-%d %H:%M')}")
  print(f"### Save Complete : \"{market_type}\" \"{symbol}\" \"{interval}\" Klines/Candles [ {all_data_path} ]")
  return df, all_data_path

if __name__ == "__main__":
  market_type="um"
  symbol="BTCUSDT"
  interval="1m"
  df, data_path = klines_merge(market_type, symbol, interval)

 

2-3 다운로드 기능을 실행

위와 같이 잘 만들어둔 다운로드 모듈을 파이썬으로 불러와 사용하거, 별도로 실행하면 된다.

파이썬 모듈로 불러오는 경우 아래와 같이 사용할 수 있다. (Dataframe 하나와 캔들데이터가 하나로 합쳐진 CSV 경로를 받는다.)

## 모듈 형태로 불러와 사용하자.
from binance_merge_candle import klines_merge

## 2개의 리턴값을 사용할 수 있다.
df, all_data_path = klines_merge("um", "BTCUSDT", "1h")

 

또는, 쉘에서 파이썬 파일을 바로 실행할 수 있다. ("__main__" 에 market_type, symbol, interval을 수정하고 실행하자.)

## 현재 실행한 위치에 /binance_data폴더를 만들고 데이터를 다운받아 저장한다.
> export STORE_DIRECTORY=$PWD/binance_data python3 binance_merge_candle.py

 

3. 비트코인 선물 3년치 1분지표

1개의 파일로 생성된 CSV파일은 원본 CSV들을 합친것 보다 용량이 줄어든다.

Pandas로 데이터를 가공해 사용할때 너무 무거울까봐 걱정했던 부분이 조금 수월해진 것 같다.

Binance Futures 1m All Time Candle Data
Binance Futures 1m All Time Candle Data

 

3-1. Python내에서  Dataframe형태로 사용할 수 있다

이제 1m_merge.csv를 판다스로 불러오거나 kline_merge를 외부에서 Python모듈로 불러와 일정 기간의 캔들 데이터를 Pandas 데이터프레임 형태로 쉽게 가공해 사용하면 된다.

 

저장한 csv은 Pandas Dataframe형태로 읽어 들여 사용할 수 있다.

index를 datetime 타입으로 지정해 특정 시간 범위의 데이터를 간단하게 배열 형태로 가공해서 사용하는 방법이 가능해진다.

## 바이낸스 선물 비트코인 1분 캔들을 모두 다운받고 확인해보자.
>>> import pandas as pd
>>> from binance_merge_candle import kline_merge
>>> data, merge_file_path = kline_merge("um", "BTCUSDT", "1m")

>>> print (mgerge_file_path)
/Users/name/binance_data/um_BTCUSDT_1m_merge.csv

>>> data.index = pd.to_datetime(data['datetime'], unit='ms', utc=True)
>>> data = data.astype(float)
>>> data = data.tz_convert('Asia/Seoul')
>>> data
                               datetime      open      high       low     close   volume
datetime
2020-01-01 09:00:00+09:00  1.577837e+12   7189.43   7190.52   7177.00   7182.44  246.092
2020-01-01 09:01:00+09:00  1.577837e+12   7182.43   7182.44   7178.75   7179.01   70.909
2020-01-01 09:02:00+09:00  1.577837e+12   7179.01   7179.01   7175.25   7177.93   99.420
2020-01-01 09:03:00+09:00  1.577837e+12   7177.77   7182.60   7177.00   7181.11   69.330
2020-01-01 09:04:00+09:00  1.577837e+12   7179.10   7179.10   7172.94   7175.25   97.368
...                                 ...       ...       ...       ...       ...      ...
2023-11-08 16:38:00+09:00  1.699429e+12  35293.00  35297.10  35283.40  35287.00   49.566
2023-11-08 16:39:00+09:00  1.699429e+12  35286.90  35293.30  35285.70  35286.00   42.852
2023-11-08 16:40:00+09:00  1.699429e+12  35287.00  35288.20  35284.00  35284.10   25.221
2023-11-08 16:41:00+09:00  1.699429e+12  35284.00  35295.40  35284.00  35294.60   50.474
2023-11-08 16:42:00+09:00  1.699429e+12  35294.50  35306.10  35294.50  35306.10   33.055

[2026543 rows x 6 columns]
>>> df = data['2023-01':]
>>> df
                               datetime     open     high      low    close   volume
datetime
2023-01-01 00:00:00+09:00  1.672499e+12  16584.8  16589.8  16583.6  16589.8  191.911
2023-01-01 00:01:00+09:00  1.672499e+12  16589.8  16591.2  16589.8  16590.3  143.386
2023-01-01 00:02:00+09:00  1.672499e+12  16590.4  16590.4  16590.3  16590.3   46.179
2023-01-01 00:03:00+09:00  1.672499e+12  16590.3  16594.7  16590.3  16594.7   86.854
2023-01-01 00:04:00+09:00  1.672499e+12  16594.7  16597.8  16594.6  16596.8  137.128
...                                 ...      ...      ...      ...      ...      ...
2023-11-08 16:38:00+09:00  1.699429e+12  35293.0  35297.1  35283.4  35287.0   49.566
2023-11-08 16:39:00+09:00  1.699429e+12  35286.9  35293.3  35285.7  35286.0   42.852
2023-11-08 16:40:00+09:00  1.699429e+12  35287.0  35288.2  35284.0  35284.1   25.221
2023-11-08 16:41:00+09:00  1.699429e+12  35284.0  35295.4  35284.0  35294.6   50.474
2023-11-08 16:42:00+09:00  1.699429e+12  35294.5  35306.1  35294.5  35306.1   33.055

[448843 rows x 6 columns]
>>>

Pandas를 잘 몰랐는데 이참에 공부를 좀 해야 겠다. ^^ 

 

마치며

머신러닝에 캔들 데이터를 써먹으려고 2백만개의 1분지표 캔들 데이터를 한방에 모으는 작업을 했다. 이미 시계열 데이터를 머신러닝으로 예측하는 머신러닝 관련 자료들이 많아서 수집한 캔들 자료를 딱맞춰서 써먹을 수 있는 작업을 진행해 보도록 하겠다.

2023-11-11 Update: market_type을 변수로 선물/현물 데이터를 선택적으로 받을수 있도록 수정함.

이 스크립트의 csv를 합치는 기능은 급조되고 조악한 수준으로 인해 약간의 문제가 있다. ^^;;;

  • csv를 하나로합칠때 Symbol, Interval 폴더의 구분없이 csv 파일로 무작정 합쳐서 1개로 저장한다.
  • Symbol, Interval을 조건으로 csv를 만들어 주지 않기때문에 -> Symbol, Interval을 조건으로 사용하도록 수정을하거나, data폴더를 지우는 방식으로 초기화 하면서 1종류의 Symbol, Interval만 처리 하도록 해야 한다.