【InfluxDB】OFFSETを使わないでクエリを分割する【InfluxQL】

May 20, 2020

巨大なデータをSELECTする場合OFFSETを用いてクエリを分割すると処理時間が長くなってしまうので使わないように変更する。

先日の続き

先日の記事でInfluxQLにおいてLIMITOFFSETを使いクエリを分割する方法を実装した。

しかしOFFSETは仕組み上クエリ毎にOFFSET値までの検索を走らせる必要があるため内部で重複したSELECTが発生している。

つまり単純に1回のクエリでデータを取得する場合と比べ、分割クエリは処理時間が長くなることが分かっている。

そこで今回は各クエリにおいて最終レコードのタイムスタンプを使い次のWHERE句を再生成することによりオーバーヘッドを軽減する。

前提

Node.js上からInfluxDBを操作するライブラリnode-influxを使用する。

手順

それほど複雑ではないが分かりやすく順番に解説する。

①ループする回数を計算

SELECT COUNT(*) ~で一度全件のカウントを取る。この時COUNTは非常に処理が軽いためクエリ分割は必要ない。

また件数は帰ってきた配列[0].countというメンバ変数に入る点に注意。

const ループ回数 = Math.floor(配列[0].count / 設定するLIMIT);

LIMITを指定したSELECT文を発行

SELECTFROMWHERE time > 開始日時 AND time < 終了日時 LIMIT 1000

これにより最初の1000行が抽出される。私の場合2~3秒で1クエリとなるよう10万件でLIMITしている。

③プログラムで最終行の日付を取得する

.queryの結果がdataという配列に入るようにしている。

const lastDate = data[data.length - 1].time.toNanoISOString()

必ずタイムスタンプがtimeというオブジェクトに入っている。このオブジェクトnode-influx特有のもの。

いくつかメソッドを持っているおりtoNanoISOString()を利用するとナノ秒を含めた日付文字列を取得できる。

また初心者がハマりやすい点として配列の最後の要素はlength - 1である点は注意。

④先程抽出した日付をWHERE句に追記

次に発行するクエリに追記する。

SELECTFROMWHERE
time > 開始日時 AND 
time < 終了日時 AND
time > ${lastDate} -- 追記
LIMIT 1000

最初のクエリにAND条件を一つ追記した。

注意すべき点は不等号が「大なり」になっている点だ(逆にした場合は小なり)

イコールをつけてしまうと最後と同じ行が重複して取得されるためにこう記述する。

余談:time > 開始日時の条件は重複なので削ってしまって良い。私の場合何かしらの改修orバグ発生時にここが抜け落ちているとえげつない量の件数がSELECTされる可能性もあり、条件が重複しても正常動作するためそのままにしている。オーバーヘッド的にはそれほどではないと考えている。

③に戻る

あとは③に戻って繰り返すだけだ。

技術的な注意点

ここは少し難しい。前回説明していないのでこちらで説明する。

クエリの発行は非同期で行われる。つまり複数のクエリが同時進行で処理されてしまう。

そのまま分けたクエリを同時に処理することも出来なくはないが量が多すぎてタイムアウトなどで破棄されるパターンがある。

私のコードでは大量にクエリを送信することのメリットはそれほど無いのでawaitにより1件づつ実行している。

非同期処理を1件づつ順番に行う方法

無名関数によりループ部にasyncを付ける。

.querypromiseを返すのでasync/awaitにて待つことが出来る。もっと深く勉強したければ私の次の記事も読んでみて欲しい。

Promiseとasync/awaitが正しく機能しない時に読む記事

const loopMax = Math.floor(recordCount / select_limit); //←重要!セミコロンで明示的に行を終わる!
(async () => { //asyncの無名関数
    for (let loopNo = 0; loopNo <= loopMax; loopNo++) { //ループ回数を考慮
        await queryMain(loopNo, loopMax) //ここにawait
    }
    //データのファイル出力
    exportData()
})();

無名関数の文法については他のWEBサイトへ譲る。

このコードが関数であれば 関数名 = () => {}を変更し関数名 = async() => {}でも良い。

余談:私は無名関数の構文はこのasyncが欲しいパターンでしか使わないが他に便利なケースがあるのだろうか?新しい記事が書けるかも。

queryMain()の中身

//前省略
return remoteDB.query(limitedQuery) //ここでreturn
.then((res) => {
//後ろ省略

queryメソッドをreturnすればそのままpromiseになる。

実行時間

この実装により実行時間は1回のクエリで纏めて取得した時間とほぼ変わらない数値がでた。かなりのオーバーヘッド削減になったと思われる。

さいごに

実装してみると意外に簡単であることが分かる。

どうせなので今回のコードを入れた5秒足をつくるコード全体を公開しておく。

コード

グローバル変数が多いので余りスマートな実装だとは思えないがよければ参考にしてほしい。

InfluxDBとその設定関係は独自のソースを使っているのでコピペでは動かない。各自実装すること。

import Config from '../utils/config.mjs'
import DB from './libs/classes.mjs'
import dayjs from 'dayjs'
import * as fs from 'fs'

//------------------------------------------------------------------------------------
// 5秒足作成
// v0.1.3
// 2020/05/18   クエリを分けるシステム実装(OFFSETを使わない方式に変更)
//              空になってしまった足も生成するように変更
// 2020/05/07   日付ラベルの計算式を修正
//------------------------------------------------------------------------------------

//------------------------------------------
// 設定
//------------------------------------------

const sec_time_span = 5 //秒足の秒数
const select_limit = 100000 //1回のクエリで取得するレコード数
const use_validation = true //生成された足に不整合がないか検証する

//------------------------------------------

// グローバル変数
let candle = [] //足が格納される箱
let start_time = null //一番最初の足の日時を設定する
let errors = [] //エラー報告用
const config = new Config('../config/config.yaml') //DB用設定の読み込み
const process_time_start = process.hrtime() //計測用
let lastDate = null //OFFSETを使わないページネーション用

// DBサーバ
const remoteDB = new DB({
    host: config.db_host_remote,
    username: config.db_username,
    password: config.db_password,
}).influx

if (sec_time_span - Math.floor(sec_time_span) != 0) {
    console.log(`Error: sec_time_spanは整数である必要があります。`)
    process.exit(1);
} else if (60 % sec_time_span) {
    console.log(`Error: 1分(60秒)はsec_time_span(${sec_time_span}秒)では割り切れません。`)
    process.exit(1);
} else {
    console.log(`----- Start Creating ${sec_time_span}seconds OHLCV Data -----`)
}

//日時指定 外部から処理を委託する場合はここに引き渡し処理を記述
const start_date = '2020-05-17'
const end_date = '2020-05-18'

const allQuery = `SELECT * FROM "bitFlyer_db"."autogen"."lightning_executions_FX_BTC_JPY" WHERE time > '${start_date}' AND time < '${end_date}'`
const countQuery = `SELECT COUNT(id) FROM "bitFlyer_db"."autogen"."lightning_executions_FX_BTC_JPY" WHERE time > '${start_date}' AND time < '${end_date}'`

remoteDB.query(countQuery) //件数をカウント
    .then((res) => {
        if (res.length === 0) { console.log("No Data Counted."); process.exit(1); }
        const recordCount = res[0].count //レコード数
        console.log(`Total Records: ${recordCount}`)
        const loopMax = Math.floor(recordCount / select_limit);
        (async () => {
            for (let loopNo = 0; loopNo <= loopMax; loopNo++) {
                await queryMain(loopNo, loopMax) //分割してメインクエリを発行
            }
            //データのファイル出力
            exportData()
        })();

    })
    .catch((err) => {
        console.log(err); process.exit(1);
    })

//メインのクエリ発行
const queryMain = (loopNo, loopMax) => {
    let limitedQuery = null
    if (lastDate === null) { limitedQuery = `${allQuery} LIMIT ${select_limit}` } // 最初のレコードだけ日付を絞らない
    else { limitedQuery = `${allQuery} AND time > '${lastDate}' LIMIT ${select_limit}` } // ページネーションコード
    console.log(`Execute Query: ${loopNo} / ${loopMax}`)
    return remoteDB.query(limitedQuery)
        .then((res) => {
            if (res.length === 0) { console.log("No Data Selected."); process.exit(1); }
            candleMain(res) //ローソク足生成
            lastDate = res[res.length - 1].time.toNanoISOString()//最後の日付を記録して次のページネーションを行っている
        }).catch((err) => {
            console.log(err)
        })
}

const candleMain = (data) => {
    //初回のみ最初の足の日時を決定する
    if (candle.length === 0) { start_time = caliculateStartTime(data) }
    else if (start_time === null) {
        console.log("Error: Start Time is Null"); process.exit(1)
    }

    for (const d of data) {
        const arrayNum = caliculateArrayNum(start_time, d.exec_date)
        //要素を新規
        if (!candle[arrayNum]) {
            candle[arrayNum] = {
                date: dayjs(start_time).add(arrayNum * sec_time_span, 'second'), //日付ラベル
                O_date: dayjs(d.exec_date),
                O: d.price,
                H: d.price,
                L: d.price,
                C: d.price,
                C_date: dayjs(d.exec_date),
                V: { BUY: 0, SELL: 0 }
            }
        } else {

            //O より早いレコードが入ってきた場合oを更新
            if (candle[arrayNum].O_date > dayjs(d.exec_date)) {
                candle[arrayNum].O_date = dayjs(d.exec_date)
                candle[arrayNum].O = d.price
            }

            //Hを生成
            if (candle[arrayNum].H < d.price) {
                candle[arrayNum].H = d.price
            }

            //Lを生成
            if (candle[arrayNum].L > d.price) {
                candle[arrayNum].L = d.price
            }

            //C より遅いレコードが入ってきた場合Cを更新
            if (candle[arrayNum].C_date < dayjs(d.exec_date)) {
                candle[arrayNum].C_date = dayjs(d.exec_date)
                candle[arrayNum].C = d.price
            }

        }

        // Vを生成(BUY:SELL別Volume)
        // 注意:candle[arrayNum].V[d.side]としてはいけない
        // v.sideは板寄せ時の約定で空白が入っている可能性がある
        if (d.side === 'BUY') {
            candle[arrayNum].V.BUY += d.size
        } else if (d.side === 'SELL') {
            candle[arrayNum].V.SELL += d.size
        }
    }
}

// --------------------------------------------------------------------------------------------------------------------------------
// funcs 
// --------------------------------------------------------------------------------------------------------------------------------

const exportData = () => {

    //疎らな配列が出来るのでforで回す
    for (let i = 0; i <= candle.length - 1; i++) {

        //不整合がないか検証        
        if (use_validation) { validation({ c: candle[i], index: i }) }
        const c = candle[i]

        //出力
        const BUY = ('      ' + (Math.round(c.V.BUY * 100) / 100)).slice(-6) //文字のパディング
        const SELL = ('      ' + (Math.round(c.V.SELL * 100) / 100)).slice(-6) //文字のパディング
        console.log(`${dayjs(c.date).format('YYYY/MM/DD HH:mm:ss')} [O: ${c.O}][H: ${c.H}][L: ${c.L}][C: ${c.C}][BUY: ${BUY}][SELL: ${SELL}] ${i}`)
    }

    //レポート
    console.log("--- Report ---")
    for (const e of errors) {
        console.log(e)
    }

    //ファイルに出力
    console.log("Exporting JSON File.")
    fs.writeFileSync(`../data/candlestick(${sec_time_span}sec).json`, JSON.stringify(candle));
    console.log("JSON File Exported.")

    //処理時間計測
    const performance_time_end = process.hrtime(process_time_start)
    console.log(`Execution time: ${performance_time_end[0]}s ${performance_time_end[1] / 1000000}ms`)
}

// 生成した足にエラーがないか検証する
const validation = ({ c, index }) => {
    // 疎らなためデータが存在しなかった場合は空の箱を入れる
    if (!candle[index]) {
        console.log(`${index}を追加`)
        candle[index] = {
            date: dayjs(start_time).add(index * sec_time_span, 'second'), //日付ラベル
            O_date: dayjs(start_time).add(index * sec_time_span, 'second'),
            O: candle[index - 1].C,
            H: candle[index - 1].C,
            L: candle[index - 1].C,
            C: candle[index - 1].C,
            C_date: dayjs(start_time).add(index * sec_time_span, 'second'), //日付ラベルと同じものを入れる
            V: { BUY: 0, SELL: 0 }
        }
        c = candle[index]
    }
    if (c.L > c.H) errors.push(`Error(index${index}): L > H`) //LがHより大きい
    if (c.O_date > c.C_date) errors.push(`Error(${index}): O_date > C_date`) //Oの日付よりCの日付のほうが早い
    if (index > 0) {
        if (candle[index - 1]) {
            if (candle[index - 1].C_date > c.O_date) { errors.push(`Error(${index}): C_date[-1] > O_date`) } //前レコードのCの日付より今のレコードのOの日付のほうが早い
            const prevPlus5 = dayjs(candle[index - 1].date).unix() + sec_time_span
            const current = dayjs(candle[index].date).unix()
            if (prevPlus5 != current) { errors.push(`Error(${index}): Date Differs ${prevPlus5}(prev + 5) : ${current}(current)`) } //日付ラベルが不正
        } else {
            errors.push(`Error: index${index - 1} dosen't exists.`)
        }
    }
}

//開始時間の計算
const caliculateStartTime = (data) => {
    const start1 = dayjs(data[0].exec_date).second() //最初の秒数
    //console.log(`秒数 ${start1}`)
    const start2 = Math.floor(start1 / sec_time_span) //要素番号
    //console.log(`要素番号: ${start2}`)
    const start3 = start2 * sec_time_span
    //console.log(`スタート秒数: ${start3}`)
    const start4 = dayjs(data[0].exec_date).startOf('minute').second(start3)
    //console.log(`スタート時間: ${start4}`)
    return start4
}

//要素番号の計算
const caliculateArrayNum = (start_time, exec_date) => {
    const time = dayjs(exec_date).unix() - start_time.unix()
    return Math.floor(time / 5)
}
この記事をシェア:

author icon

仮想トイレ @CrypticToilet
プログラミングや仮想通貨のシステムトレードに関する情報を更新中!どんな情報を流しても詰まらないトイレ。