【InfluxDB】バッチ登録のすゝめ【bitFlyer 板データ保存】

April 11, 2020

PCの障害を避けるためにバッチ処理でデータを登録する方法を公式がオススメしていたので実装した。

logo

今日の画像は適当ではない。Batchはパンなど一焼き分(プレートに纏めて何個も置くやつ)の意味でTOEICに出るので覚えておこう。

ことの発端

朝起きたら記録用サーバーが落ちており、BIOS画面になっていた。

なにがあったのかと調べるとSSDからOSが起動しない。Windowsのエラーログは異常なしだ。

どういうわけかSSDのホットプラグ設定をONにしたら復旧。

原因はいまだ不明だが非常に細かいInfluxDBの書き込みが原因なのではと想像がつく。

公式の見解:バッチのすゝめ

batchの解説 にて「5000-10000件で纏めて登録したほうが効率が良い」と書いてあるのを発見した。

具体的な数字が挙げられている点がとても気になる。今回は稼働率確保のためにも実装してみることにした。

実装

今回は初心者向けに解説するのは不可能なレベルまできたので詳しくは説明しない。

アルゴリズムとしては入ってきたデータをグローバル変数に蓄積し、要素数がしきい値を超えた時にのみwritePoints()を実行するというシンプルなアルゴリズムだ。

覚えておきたい事:非同期処理

writePoints() は非同期処理である。したがって単純に件数でしきい値を設定していると書き込み中さらに件数の判定が入った場合は重複してwritePoints() が呼び出されることとなりかなり危険なコードとなる。

そこで書き込み中に流れてきたデータは先にバッファに入れて件数を0にしてしまってから書き込む処理で実装した。

コード一部紹介

今回は複雑になりそうな事が予想できたのでクラス化した。シャープの記号は「プライベートフィールド宣言」と言うらしい。オブジェクトの外から参照できなくするものだ。

一応言及しておくが、クラス内のメソッドを簡単にプライベートにする構文は今のところNode.jsには無い(はず)

代替案として様々な方法が提唱されてはいるが、たちまち可読性が悪くなるため、昔ながらのアンダーバーで実装した(アクセスは可能だが、暗黙の了解でアクセスしないでねという意味)

export class DataStore {
    #push_data = []
    #buff = []
    #influx = null
    #threshold = 5000 //default
    date = new Date()
    constructor(threshold) {
        this._getDatabaseNames()
        threshold != undefined ? this.#threshold = threshold : null
    }

    set store(data) {
        Array.isArray(data) ? this.#push_data = this.#push_data.concat(data) : this.#push_data.push(data)
        const _n = this.#push_data.length
        if (_n > this.#threshold) {
            this.#buff = this.#push_data.concat(this.#buff)
            this.#push_data = []
            this._write()
        }
    }
    get influx() {
        return this.#influx
    }
    _getDate() {
        return this.date.getFullYear() + "/" + this.date.getMonth() + 1 + "/" + this.date.getDate() + "/" + this.date.getDay()
    }
    _write() {
        if (this.#influx != null) {
            const _length = this.#buff.length
            this.#influx.writePoints(this.#buff).then(() => {
                if (_length === this.#buff.length) {
                    console.log(this._getDate() + ": Data output(" + this.#buff.length + ")")
                    this.#buff = [] //正常
                } else {
                    //処理が追いついていないとwritePoints前のlengthと後のlengthに差異が出る
                    this.#buff.splice(0, _length)
                    console.log(this._getDate() + ": writePoints() delays! Remain:" + this.#buff.length)
                }

            }).catch(err => {
                console.error(`Error saving data to InfluxDB! ${err.stack}`)
            })
        } else {
            console.log("Error:Influx is empty!")
        }
    }
    _getDatabaseNames() {

        this.#influx = new Influx.InfluxDB({
            host: 'localhost',
            database: 'bitFlyer_db',
            schema: [
                {
                    measurement: 'lightning_board_FX_BTC_JPY',
                    fields: {
                        price: Influx.FieldType.INTEGER,
                        size: Influx.FieldType.FLOAT,
                    },
                    tags: [
                        'type',
                        'bid_or_ask',
                    ]
                },
                {
                    measurement: 'lightning_executions_FX_BTC_JPY',
                    fields: {
                        id: Influx.FieldType.INTEGER,
                        price: Influx.FieldType.INTEGER,
                        size: Influx.FieldType.FLOAT,
                        exec_date: Influx.FieldType.STRING,
                        buy_child_order_acceptance_id: Influx.FieldType.STRING,
                        sell_child_order_acceptance_id: Influx.FieldType.STRING,
                    },
                    tags: [
                        'side',
                    ]
                },
                {
                    measurement: 'lightning_ticker_FX_BTC_JPY',
                    fields: {
                        product_code: Influx.FieldType.STRING,
                        //timestamp: Influx.FieldType.STRING,
                        tick_id: Influx.FieldType.INTEGER,
                        best_bid: Influx.FieldType.INTEGER,
                        best_ask: Influx.FieldType.INTEGER,
                        best_bid_size: Influx.FieldType.FLOAT,
                        best_ask_size: Influx.FieldType.FLOAT,
                        total_bid_depth: Influx.FieldType.FLOAT,
                        total_ask_depth: Influx.FieldType.FLOAT,
                        ltp: Influx.FieldType.INTEGER,
                        volume: Influx.FieldType.FLOAT,
                        volume_by_product: Influx.FieldType.FLOAT,
                    },
                    tags: []
                },
            ]
        })

        this.#influx.getDatabaseNames()
            .then(names => {
                if (!names.includes('bitFlyer_db')) {
                    return this.#influx.createDatabase('bitFlyer_db');
                }
            })
            .catch(err => {
                console.error(`Error creating Influx database! : ` + err);
            })
    }
}

クラス定義なので呼び出す側のjsでnewする必要がある。

さいごに

今回はとにかくレコードは1件づつではなく纏めて登録する「バッチ処理」が「公式からおすすめ」されていた点に尽きる。

まだ動作テストが十分ではないが運用しながらこれで良いか試していきたい。

2020/08/04:これが大成功で長時間の安定運用が可能になった。是非試してみてほしい。

この記事をシェア:

author icon

仮想トイレ @CrypticToilet
プログラミングや仮想通貨のシステムトレードに関する情報を更新中!どんな情報を流しても詰まらないトイレ。