時系列DBに約定や板情報を記録したい (Node.js x InfluxDB)

March 31, 2020

時系列DB(InfluxDB)であれば仮想通貨取引の情報を上手く保存できるのではないかという調査

logo

データ保存の必要性

仮想通貨取引のBOTにおいて重要な資産は「今勝てるストラテジを持っていること」ではなく「過去の相場を分析できる環境」だ。

「今勝てるストラテジ」はエッジがなくなったらそれで終わりだが、いつでもデータを分析できる環境さえあれば新しいストラテジを生み出すことができる。

データを保存することはBOT開発のはじめの一歩である(分析の手法はその後で覚えれば良い)

★この意味で考えてnoteで販売されているBOTをとりあえず購入して動かすという習慣がある人は今すぐに辞めたほうが良い。

データの形式

思ったより多くのBOT開発者が生のCSV形式でデータを保存しているのではないだろうか。

CSVファイルは可視性は悪くないが、生のテキストが保存されているためデータ容量の観点・アクセス効率の観点どちらからみてもあまり良いものだとは言いにくい。

そこで大量のデータを扱うのに適したデータベースにデータを保存するべきである。また、そもそも仮想通貨でなくともデータサイエンティストであればデータベースを利用するのは一般的。少しでも知識にしておきたい。

どのDBを使うか

大量に流れ込んでくる仮想通貨取引の情報で最も重要なのそのタイムスタンプだ。

「1000BTCが成り行きで売られた」という情報には価値がないが「2020年3月31日21時0分1秒に1000BTCが成り行きで売られた」という情報は非常に重要だ(この例でいえばBITMEXのFundingに関わるのではないか、という予想も立てられる)

値動きの分析では基本的にデータを読み飛ばしたり、特定の1コだけのデータを探すということはない。あったとしても日時を絞り時系列で纏めて分析するはずだ。

そこで最適なのが「時系列データベース」と呼ばれるものでデータを時系列で保存し高効率でアクセスすることに特化しているデータベースだ。

これに関しては「クックパッド開発者ブログ」の記事がとてもわかりやすく参考になった。

スループットの比較で「GridDB」が良さそうに見えたが過激な本家の広告感とWindowsOS環境で気軽にテストできないため「InfluxDB」を選択する運びとなった。

またこれらはいずれも無料で利用できる。

日本語の解説サイトが無い

驚いたことにNode.jsからInfluxDBを利用しているBLOGが日本語で1件もヒットしなかった。

英語ドキュメントを読み進める価値がありそうだ。

サーバーのインストール:Windows

Windows環境での実行は次のサイトが非常にシンプルで分かりやすい「InfluxDBをWindowsで起動する簡単手順(ついでにChronografも)

この通りに行った。(chronografはインストールしない)

クライアント

Node.js版クライアント「Node-Influx」のドキュメントはここだ。新しくNode.jsのパッケージを切っていく。

  1. インストール

    次のコードでインストールする

    npm install influx express

    見れば分かるがサンプルではHTTPサーバ対応のためexpressを組み合わせている。ブラウザでアクセスしたときのUIとして想定しているようだ。

  2. index.jsを新規作成

    公式ではapp.jsを作成となっているがindex.jsとした。次がコピペするコード。

    const Influx = require('influx')
    const express = require('express')
    const http = require('http')
    const os = require('os')
    
    const app = express()
    
    const influx = new Influx.InfluxDB({
       host: 'localhost',
       database: 'express_response_db',
       schema: [
           {
               measurement: 'response_times',
               fields: {
                   path: Influx.FieldType.STRING,
                   duration: Influx.FieldType.INTEGER
               },
               tags: [
                   'host'
               ]
           }
       ]
    })
    
    influx.getDatabaseNames()
       .then(names => {
           if (!names.includes('express_response_db')) {
               return influx.createDatabase('express_response_db');
           }
       })
       .then(() => {
           http.createServer(app).listen(3000, function () {
               console.log('Listening on port 3000')
           })
       })
       .catch(err => {
           console.error(`Error creating Influx database!:` + err);
       })
    
    app.use((req, res, next) => {
       const start = Date.now()
    
       res.on('finish', () => {
           const duration = Date.now() - start
           console.log(`Request to ${req.path} took ${duration}ms`);
    
           influx.writePoints([
               {
                   measurement: 'response_times',
                   tags: { host: os.hostname() },
                   fields: { duration, path: req.path },
               }
           ]).catch(err => {
               console.error(`Error saving data to InfluxDB! ${err.stack}`)
           })
       })
       return next()
    })
    
    app.get('/', function (req, res) {
       setTimeout(() => res.end('Hello world!'), Math.random() * 500)
    })
    
    app.get('/times', function (req, res) {
       influx.query(`
             select * from response_times
             where host = ${Influx.escape.stringLit(os.hostname())}
             order by time desc
             limit 10
           `).then(result => {
           res.json(result)
       }).catch(err => {
           res.status(500).send(err.stack)
       })
    })

    本家のコードでは1行目のrequire文が正しくなくエラーとなるため修正してある。

    また接続エラー時のエラー内容が出力されなかったのでそちらも変更した。

実行

  1. コマンドプロンプトにてInfluxDBをインストールしたフォルダのパスで次を実行

    influxd.exe -config influxdb.conf

    次のような画面が出れば成功。

    InfluxDB

    このウィンドウのまま閉じないように。

  2. Node.jsを作成したフォルダで別のコマンドプロンプトにて次を実行

    node index.js

    Listening on port 3000 と表示されれば成功。

  3. ブラウザにて http://localhost:3000 にアクセスし「Hollo World!」が 表示されれば成功。

    更にhttp://localhost:3000/time にアクセスが可能で、ここではブラウザがアクセスする度に1件づつ各レスポンスタイムが記録されたレコードが増えていく。

なにが起こっているのか

このサンプルを 最近マイブームのアーキテクチャ図のようなもので書いた。

アーキテクチャ図のようなもの

どこかで見たことがある。

そう、私の自動売買システムとそのUIの関係だ。

面白いところはどうやらnode-influx<>InfluxDB間もhttp通信で読み書きしているようだ(ドキュメントのプロトコルがhttpしか表記がないため)

どうでもいい情報だった。

expressに関してはユーザーとのhttp通信を担っているだけなので、私のシステムと同様ここをsocket.ioにしてvue.jsのクライアントから接続というのももちろん可能だ。

重要なのはinfluxというオブジェクトだけだ。

Influxオブジェクト

const influx = new Influx.InfluxDB({....

ここで新たに接続先とDBを指定している。

getDatabaseNames()

登録されているデータベース一覧を配列で返すだけのメソッドのようだ。includeメソッドを使って帰ってきた配列の中に指定したDBがあるかどうか確認しており、存在しなければDBを新たに生成している。

ついでにjavaScriptのincludeメソッドも覚えておきたい。

writePoints()

  influx.writePoints([
      {
          measurement: 'response_times',
          tags: { host: os.hostname() },
          fields: { duration, path: req.path },
      }

ここでDBに書き込みをっている。公式によるとどうやら配列に入っている複数のデータを一括登録するメソッドのようだ。

そして便利なところは時系列DBらしく、タイムスタンプを渡すこと無く自動で付与されるらしい。

measurementが一般のDBの「テーブル」。

tagsが「キー」と覚えておく。

これを見る限りデータはSQL文のような構文ではなく連想配列で入れていけば良いのでSQLが苦手な私には助かる。

query()

influx.query(`
      select * from response_times
      where host = ${Influx.escape.stringLit(os.hostname())}
      order by time desc
      limit 10
    `).then(result => {
    res.json(result)

データを読む場合にはSQL文とほぼ同等のものを使うようだ。

さいごに

今回はサンプルを使って見るだけのざっくりしたものだが思いのほか長くなってしまった。

JSON形式でデータを渡してやれば良さそうなので扱いやすそうだ。

色々実験しながらこれから記事を続けたい。

この記事をシェア:

author icon

仮想トイレ @CrypticToilet
プログラミングや仮想通貨のシステムトレードに関する情報を更新中!どんな情報を流しても詰まらないトイレ。