【InfluxDB】bitFlyer Realtime APIの情報を全部保存したらどうなるか

April 09, 2020

データ量的にはそれほどではないが通信の頻度と件数が多く一般のPCが使用に耐えられるのかどうかという検証。

logo

なぜ記録するのか

長らくInfluxDBを取り扱ってきたがようやくbitFlyerのRealtime APIより流れてくるデータの記録をスタートさせる準備が整った。

全ては今後投資のバックテスト等を行うためだ。

何をテストするのか

次の項目をチェックしていく。

  1. データ容量がどれぐらいになるのか
  2. サーバーの回線はWi-Fiでも大丈夫なのか
  3. スペックは十分なのか
  4. 電気代は
  5. 夏に耐えられるのか

今回は初回のため1回で全てチェックするのは難しいため継続的に検証する予定。

当面はデータの容量を見ていく予定だが、データの欠落や回線が切断されたときの挙動もなんとかして調べる方法はないだろうか(アイデアなし)

回線切断に関してはsocket.ioが自動で再接続を試みるらしいが、切断された時のログ出力など実装していないため不安である。なにかアイデアがあれば実装したい。

記録サーバーのスペック

仮想通貨投資においてはAmazonのAWSを使っているプレイヤーが多い印象だが自宅サーバーでの記録を実験してみる。

spec

メモリは16G。8~9年前のモデルだがInfluxDBのスペック表で見ると十分な性能だ。容量は少ないがSSDを搭載している。

ただ、普通のPCであるため電気代と発熱が気になる。長時間の使用に耐えられるのだろうか。

記録に使うコードを紹介

これまでの記事よりNode.jsを継続して使用。サーバー本体が直接socket.ioにてbitFlyerのAPIに接続し「node-influx」というInfluxDB用のクライアントパッケージで書き込んでいる。

コード(一部)

index.mjs

import bF from './bitFlyer.js'
import * as db from './influx.mjs'

const socket_bF_client = bF()
const influx = db.getDatabaseNames();

// スナップショット
socket_bF_client.on("lightning_board_snapshot_FX_BTC_JPY", (message) => {
    db.writeBoardData({ message, influx, type: "snapshot" })
})

// 差分
socket_bF_client.on("lightning_board_FX_BTC_JPY", (message) => {
    db.writeBoardData({ message, influx, type: "update" })
})

// 約定
socket_bF_client.on("lightning_executions_FX_BTC_JPY", (message) => {
    db.writeExecutions({ message, influx})
})

// ティッカー
socket_bF_client.on("lightning_ticker_FX_BTC_JPY", (message) => {
    db.writeTicker({ message, influx})
})

influx.mjs

import Influx from 'influx'
import microtime from 'microtime'

export const getDatabaseNames = function () {

    const influx = new Influx.InfluxDB({
        host: 'localhost',
        database: 'bitFlyer_db',
        schema: [
            {
                measurement: 'lightning_board_FX_BTC_JPY',
                fields: {
                    price: Influx.FieldType.INTEGER,
                    size: Influx.FieldType.FLOAT,
                },
                tags: [
                    'type',
                    'bid_or_ask',
                ]
            },
            {
                measurement: 'lightning_executions_FX_BTC_JPY',
                fields: {
                    id: Influx.FieldType.INTEGER,
                    price: Influx.FieldType.INTEGER,
                    size: Influx.FieldType.FLOAT,
                    exec_date: Influx.FieldType.STRING,
                    buy_child_order_acceptance_id: Influx.FieldType.STRING,
                    sell_child_order_acceptance_id: Influx.FieldType.STRING,
                },
                tags: [
                    'side',
                ]
            },
            {
                measurement: 'lightning_ticker_FX_BTC_JPY',
                fields: {
                    product_code: Influx.FieldType.STRING,
                    timestamp: Influx.FieldType.STRING,
                    tick_id: Influx.FieldType.INTEGER,
                    best_bid: Influx.FieldType.INTEGER,
                    best_ask: Influx.FieldType.INTEGER,
                    best_bid_size: Influx.FieldType.FLOAT,
                    best_ask_size: Influx.FieldType.FLOAT,
                    total_bid_depth: Influx.FieldType.FLOAT,
                    total_ask_depth: Influx.FieldType.FLOAT,
                    ltp: Influx.FieldType.INTEGER,
                    volume: Influx.FieldType.FLOAT,
                    volume_by_product: Influx.FieldType.FLOAT,
                },
                tags: []
            },
        ]
    })

    influx.getDatabaseNames()
        .then(names => {
            if (!names.includes('bitFlyer_db')) {
                return influx.createDatabase('bitFlyer_db');
            }
        })
        .catch(err => {
            console.error(`Error creating Influx database! : ` + err);
        })

    return influx;
}

// Ticker
export const writeTicker = ({ message, influx }) => {

    //Tickerはズラす必要がない
    const output_data = {
        measurement: 'lightning_ticker_FX_BTC_JPY',
        fields: {
            product_code: message.product_code,
            timestamp: message.timestamp,
            tick_id: message.tick_id,
            best_bid: message.best_bid,
            best_ask: message.best_ask,
            best_bid_size: message.best_bid_size,
            best_ask_size: message.best_ask_size,
            total_bid_depth: message.total_bid_depth,
            total_ask_depth: message.total_ask_depth,
            ltp: message.ltp,
            volume: message.volume,
            volume_by_product: message.volume_by_product
        },
        tags: []
    }

    influx.writePoints([output_data]).then(() => {

    }).catch(err => {
        console.error(`Error saving Ticker data to InfluxDB! ${err.stack}`)
    })
}

export const writeExecutions = ({ message, influx }) => {

    //ナノ秒
    const nano_date = BigInt(microtime.now() * 1000)

    const output_data = message.map((data, index) => {
        //ずらし
        const indexed_date = Influx.toNanoDate(nano_date + BigInt(index))
        return {
            measurement: 'lightning_executions_FX_BTC_JPY',
            fields: {
                id: data.id,
                price: data.price,
                size: data.size,
                exec_date: data.exec_date,
                buy_child_order_acceptance_id: data.buy_child_order_acceptance_id,
                sell_child_order_acceptance_id: data.ell_child_order_acceptance_id
            },
            tags: {
                side: data.side
            },
            timestamp: indexed_date
        }
    })

    influx.writePoints(output_data).then(() => {

    }).catch(err => {
        console.error(`Error saving Executions data to InfluxDB! ${err.stack}`)
    })
}

export const writeBoardData = ({ message, influx, type }) => {

    //ナノ秒
    const nano_date = BigInt(microtime.now() * 1000)

    const asks_data = message.asks.map((asks, index) => {
        //ずらし
        const indexed_date = Influx.toNanoDate(nano_date + BigInt(index))
        return {
            measurement: 'lightning_board_FX_BTC_JPY',
            tags: {
                type,
                bid_or_ask: 'ask',
            },
            fields: {
                price: asks.price,
                size: asks.size
            },
            timestamp: indexed_date
        }
    })

    const bids_data = message.bids.map((bids, index) => {
        //ずらし
        const indexed_date = Influx.toNanoDate(nano_date + BigInt(index))
        return {
            measurement: 'lightning_board_FX_BTC_JPY',
            tags: {
                type,
                bid_or_ask: 'bid',
            },
            fields: {
                price: bids.price,
                size: bids.size
            },
            timestamp: indexed_date
        }
    })

    influx.writePoints([...asks_data, ...bids_data]).then(() => {

    }).catch(err => {
        console.error(`Error saving Board data to InfluxDB! ${err.stack}`)
    })
}

基本コード内でやっている事は過去の記事で紹介したことのみである。少しコードが冗長である気がするが時間の都合でこのままの実装だ。

初日の検証報告(本日のデータ量)

この記事の執筆時で4時間と45分稼働した。115MBのデータがこの時間で増加している。

1日1ギガバイトになるかと見積もっていたが、この量なら500MB~多くて1G/日といったところだろうか。約定履歴のみで3G/monthという情報を頂いているので、板情報を差分含め記録している分それなりに覚悟せねばならない。

さすがに稼働時間が短いこともあって今の所Wi-Fi等にも問題はない。

追加でやりたいこと

長時間運用するにあたって障害が発生する兆しを少しでも把握するため、CPUやMEMの使用状況も合わせて記録してみたい。

これにはTelegrafというソフトウェアを追加で使用するのがベストかと思う。ハードウェアの使用状況などを記録するために利用するソフトウェアだ。

TelegrafがあればどれぐらいのSSD容量を消費するのかもグラフで確認出来るかと思う。

所感

自宅サーバーのようなことをやるのは非常に久々でありディスクアクセスのランプが24時間チカチカしているのはやはり少し不安になる。

とりあえずはデータ分析用としてでなく記録が継続できるかのテストなので気軽にやっていきたい。

この記事をシェア:

author icon

仮想トイレ @CrypticToilet
プログラミングや仮想通貨のシステムトレードに関する情報を更新中!どんな情報を流しても詰まらないトイレ。