ご無沙汰しています。前回の投稿から、だいぶ間が空いてしまいました。
Aizuna
今回は、Rust言語を使ってチャットサービス上でサイコロを振れるチャットボットを作りました。
名称は「Aizuna」です。
Rustで記述されているのが特徴です。
チャットの入力をコマンドとして、ユーザ管理をしながらデータベースを書き換えるということをしています。 CRUDと呼ばれる操作です。Webサービスのバックエンドなんかでやる事ですね。
AizunaはテーブルトークRPGの「深淵」に対応してカードの管理を行うことができます。 テーブルトークRPGについては本投稿では深くは触れません。
Aizunaは組み込みデータベースとしてLevelDBのRust実装である rusty-leveldbを採用しています。
デフォルトでは無効化していますが、コルーチンによる動作も搭載しています。 fringeを採用しています。
ドキュメントとコード
Aizunaはgithubで公開しています。
実行はGentoo Linux amd64 profile 17.1で確認しました。
動作環境
仮想環境のWindows(msys2 + rust nightly x86_64-pc-windows-gnu)では 依存しているライブラリが通せませんでした。 Windowsで動かしたい場合は、Windowsストアの Ubuntu等のLinuxディストリビューションが使えるかもしれません。
Cloud9でも動作確認できましたので、そういったクラウドIDE環境も使用できるかと思います。
Rust
RustはMozilla製のプログラミング言語です。 説明によると「速度、安全性、並行性の3つのゴールにフォーカスしたシステムプログラミング言語」 とあります。
Rustのパラダイム
Wikipediaによると次のようにあります。
- 手続き型プログラミング
- オブジェクト指向プログラミング
- 関数型プログラミング
- 並列アクターモデル
現状の自分の考えでは、この辺りパラダイムの話は「結局マシン語になるから……」 という気構えで緩く受け止めるほうが良いという気がします。
手続き型と称されるCでも、関数ポインタを取り回してオブザーバパターンを構築することもよくあります。 言語によっては、パラダイムの一部分の機能を実現するに滞っていたりします。 そもそも概念を実現する下層のレイヤーでは、 同じ手法を別名で表現してるだけだったりするようにも思います。
「これは関数型言語なので!」等と気負うと、本質的な部分が見え難くなってしまうかなと感じます。
言うならば、歴史の上で少しづつ研鑽されていった技法に、 時代毎に名前が付けられていったようなものでしょうか。
Rustに於いてはそれらのパラダイムが上手く統合できると踏まえて採用されています。 それぞれのパラダイムの利点を上手く選択することが要点になってくると思います。
後述しますが、Aizunaでは 「並列アクターモデルによるメッセージパッセンジャーをスレッドとコルーチンの二通りの方法で実現する」 という事を行なっています。
Aizuna詳説
Rustの採用を始め、 Aizunaにはいろいろと「やってみよう」という試みを盛り込んであります。
そのため、全体的にいささかキメラな構成になっています。 本投稿のネタにするために敢えてそうした部分もありますので、追って説明していきます。
ただ、Rustの知識が十分ではないため、イディオム的に間違っていたり、 非効率な記述をしている部分はあると思います。
ステップ数
現段階でコードステップは下記のようになりました。 良くない事ですが、テストをほぼ書いていないので、実処理のみの行数です。
$ cargo count ./src
Gathering information...
Language Files Lines Blanks Comments Code
-------- ----- ----- ------ -------- ----
Rust 43 8719 409 1663 6647
-------- ----- ----- ------ -------- ----
Totals: 43 8719 409 1663 6647
制作の記録をみると、全体ではひと月弱ほどの時間を掛けています。 前半は採用ライブラリの選択に費していました。 後半はひたすらドキュメント整備でした。
言語採択
創り始めるに当って言語を選ぶことから始めました。 C++かNode.jsかRustかPythonかNimかといったところで、 ライブラリとして下記があるものを探しました。
- Discord
- 組み込みデータベース
- コルーチン
Node.jsでWebをインターフェースとし、 フロントとバックを同一言語でやるというのも惹かれたのですが、 別のツールでこっそりやったので今回はなし。 情報の多いC++ですが、力尽きたらC++に逃げようと考えて外し、 Pythonは安定にすぎるかなといった感じで外しました。 Nimには、Rustで書いたOpenGL表示のプログラムを移してみたことがあります。 ほぼ1対1で移植できて、速度とサイズの両面で向上が見られました。 コンパイルのバグを踏んだこともありましたが、良い言語です。
今回は理解を進めたかった事もあり、Rustを選択しました。
ライブラリ選択
有用なライブラリは沢山利用しているのですが、取り分け目につくものを上げて紹介します。
Discord disdord-rs
利用するチャットサービスにDiscordを選択しました。 Discordと接続するためのライブラリとしてdiscord-rsを採用しました。
組み込みデータベース LevelDB
アプリケーションに組み込めるデータベースといえば、SQLite3が選択肢です。 しかし今回は、NoSQLのkey-valueDBを使いたかったので、その方向で別の物を探しました。
結果としてgoogle製のLevelDBを使用することにしましたが、 そのラッパーであるleveldb-rsの使用は見送りました。
せっかくRustなのだから、ということで RustのみでLevelDBの機能を提供しているrusty-leveldbを採用しました。 「クロスコンパイルを行なう場合、Rustのみで記述されていることが利点になるのでは」と 期待したという理由もあります。
コルーチン fringe
コルーチンはスレッドに似ています。 処理を並列に動作しているように記述することを可能にします。
コルーチンのコンテキストスイッチはOSスレッドのそれを使用しません。 ライブラリの独自実装でコンテキストスイッチを実現し、 プロセスを切り換えずに継続する処理を入れ換えます。
独自の軽量なコンテキストスイッチによる高速化。 プロセスを切り換えないためミューテックスのようなマルチスレッド保護が必要なくなること。 これらによって、処理速度の向上が見込めることがコルーチンの利点です。
fringe
コルーチンライブラリにはfringeを採用しました。 yieldだけではなくresumeからも値を返せるのが特徴です。
extern crate fringe;
use fringe::{Generator, OsStack};
fn main() {
let stack = OsStack::new(0).unwrap();
let mut gen = Generator::new(stack, move |yielder, mut input| {
for i in 1..4 {
println!("loop: {:?}", input); // loop(n)
input = yielder.suspend(i);
}
});
println!("{:?}", gen.resume(0)); // Some(1)
println!("{:?}", gen.resume(1)); // Some(2)
println!("{:?}", gen.resume(2)); // Some(3)
println!("{:?}", gen.resume(3)); // None
}
これによってメッセージパッセンジャーやイベントシステムのような機構を実現することができます。
Aizunaでは、複数のGeneratorを保持しておいて、 順番にresumeすることでイベントループを作成しています。
実装の詳細
以下ではAizuna特有の実装の内部について解説します。
イベントループ
- AizunaはチャットサービスにCommand::Listenを送ります。
- するとチャットサービスはユーザの入力を待ち、その入力をレスポンスとしてAizunaに返します。
- Aizunaは返って来たレスポンスによって処理を行ない、対応するコマンドを返します。
- Command::Sendのような表示命令がチャットサービスに送られると、結果としてユーザに表示されます。
- 最初に戻ります
このようにコマンドを送り、レスポンスを受け取るということを繰り返すことでAizunaは動作しています。
レスポンスとコマンド
Discordのようなチャットサービスと連携するためにレスポンスとコマンドという構造が定義されています。
enum Responce
コンソールやチャットサービスからの応答を表すEnumです。
use super::{Error, MessageAelicit};
/// enum Responce
#[allow(variant_size_differences)]
#[derive(Debug)]
pub enum Responce {
/// エラー
Error(Error),
/// 処理の区切り
Yield,
/// メッセージ
Message(MessageAelicit),
}
MessageAelicitについては後述します。
enum Command
コンソールやチャットサービスへ送る命令を表すEnumです。
use std::collections::BTreeSet;
use super::MessageAelicit;
/// enum Command
#[allow(variant_size_differences)]
#[derive(Debug)]
pub enum Command {
/// 終了を要求する
Quit(Option<MessageAelicit>),
/// 応答を待つ
Listen,
/// チャンネルに文字列を表示する
Send(MessageAelicit, String),
/// メンバーに文字列を表示する
Whisper(BTreeSet<String>, String),
/// チャンネル、メンバー、送信者それぞれに文字列を表示する
SendWhisperMine(
(MessageAelicit, String),
(BTreeSet<String>, String),
(String, String),
),
}
trait Message
コンソールやチャットサービスからの応答を抽象化したTraitです。 スレッド間で受け渡すためにSync,Sendをderiveすることを要求しています。
use std::fmt::Debug;
/* MessageAelicit = Arc<RwLock<Box<T>>> where T: Any + Message */
aelicit_define!(aelicit_message, Message);
pub use self::aelicit_message::Aelicit as MessageAelicit;
pub use self::aelicit_message::WeakAelicit as MessageWeakAelicit;
pub use self::aelicit_message::EnableAelicitFromSelf as MessageEAFS;
pub use self::aelicit_message::EnableAelicitFromSelfField as MessageEAFSField;
/// trait Message
pub trait Message: Debug + Send + Sync + MessageEAFS {
// ========================================================================
/// チャットサービス固有のメッセージの実体を得る
fn as_any(&self) -> &::std::any::Any;
// ========================================================================
/// チャットサービスのタイプを得る
fn as_connector_type(&self) -> &str;
// ========================================================================
/// チャットサービスの識別名を得る
fn as_connector_id(&self) -> &str;
// ========================================================================
/// メッセージの送信者の識別名を得る
fn as_author_id(&self) -> &str;
// ========================================================================
/// メッセージの送信者の表示名を得る
fn as_author_name(&self) -> &str;
// ========================================================================
/// メッセージの送信者のメンションを得る
fn as_author_mention(&self) -> &str;
// ========================================================================
/// メッセージが発生したチャンネルの識別名を得る
fn as_channel_id(&self) -> &str;
// ========================================================================
/// メッセージの内容を得る
fn as_content(&self) -> &str;
}
Aelicitという単語が見られますが、 この中身はArc<RwLock<Box<T>>> where T: Any + Trait
という構造です。 ElicitというRc<RefCell<Box<T>>> where T: Any + Trait
のArc版です。
RcやArcに機能を追加して、C++のstd::shared_ptrのように使用するために作成しました。 一応、hanepjiv/elicit-rsで公開しています。 ドキュメントはコメントが少しあるだけなのですが……。
EnableElicitFromSelf / EnableAelicitFromSelfという機能があり、 std::shared_ptrのenable_shared_from_thisのように、 自分自身を差すElicit / Aelicitを取得できるようになっています。
AizunaではAelicitによってメッセージの抽象から実体を取得しています。
Rustで多態を実現するには、enumによるvariantや、 Trait(2018年 2月 現在はnightlyのimpl Trait等も)、 Boxによる動的ディスパッチがあります。
Aelicitは最後のBoxによる実装を行なっています。
コルーチンによる実装
コルーチンで記述した場合、前述の動作はかなり直感的に書き下すことができます。 libfringeのresumeが値を返せることが有効に働いているおかげです。
impl Aizuna {
pub fn gen(mut self, stack_size: usize) -> Result<()> {
info!("Aizuna: Fringe");
// ConnectorからlibfringeのGeneratorを作成してVecDequeに収集する
let mut gens = VecDeque::default();
for x in self.connectors.iter() {
gens.push_back(RefCell::new(x.gen(::fringe::OsStack::new(stack_size)?)?));
}
// 有効なGeneratorがある限りループ
while let Some(con) = gens.pop_front() {
// VecDequeの先頭から取り出す
// Command::Listenを発行 responce(= Message)を受けとる
let mut res = con.borrow_mut().resume(Command::Listen);
debug!("{:?} / {:?}", con, res);
// GeneratorがNoneを返すまでループ
while let Some(r) = res {
match r {
Responce::Error(x) => {
// Generatorがエラー -> 終了を要求
eprintln!("Aizuna::run: Error: {:?}", x);
res = con.borrow_mut().resume(Command::Quit(None));
}
Responce::Yield => {
// Generatorが待機状態 -> キューの最後に送る
gens.push_back(con);
break; // 次のGeneratorを処理
}
Responce::Message(ref message) => {
// Message受信 -> 処理
match self.on_message(message) {
Err(x) => {
// Message処理でエラー
eprintln!("Aizuna::on_message: Error {:?}", x);
res = con.borrow_mut().resume(Command::Send(
message.clone(),
String::from("Inner error occured."),
))
}
Ok(None) => {
// Message処理終了 -> キューの最後に送る
gens.push_back(con);
break; // 次のGeneratorを処理
}
Ok(Some(cmd)) => {
// 文字列送信などのコマンド発行
res = con.borrow_mut().resume(cmd);
}
}
}
}
::std::thread::yield_now(); // CPU時間を別スレッドに譲渡
}
// DBの書き込みバッファをフラッシュ
debug!("Aizuna: DB.flush");
if let Err(x) = self.db.flush() {
eprintln!("Aizuna::spawn: DB.flush: {:?}", x);
}
}
// 終了
info!("Aizuna: Stop");
Ok(())
}
}
コルーチンとスレッド
コルーチンの利点は前述の通り、処理効率が高くなることが期待できることです。 しかし実を言えば、今回のチャットボットはそれほど高効率で動作する必要があるものではありません。
実際にコルーチンで記述したところ、 プロセスの切り換えがないためCPUコア1つを占有して消費しつづけている様でした。 これにより確かに高い応答性は期待できますが、電力消費量が上がりますし、 他のプロセスのCPU時間を奪っていることが気になります。 処理時間を譲渡する命令(thread yield)をループに挟んでいるので問題ではないのですが、 CPU使用率を減らした方が良いように思えました。
そもそもコルーチンで書き始めたのは、 libfringeのresumeの機能がイベントループを簡潔に記述できそうだと感じたためです。 この点は予想通り、快適に書き下すことができました。 と同時に、一度書いたことで設計の細部を十分に把握することができました。
そこで、本末転倒ではあるものの、 CPU使用率の問題を解消するため同様の処理を完全にスレッドに置き換える事にしました。 コルーチンによる動作はオプションの機能とし、デフォルトではOSスレッドをサポートするものとします。 結果として予想された応答性能の低下も、 当然の事ですが体感できる程度のものではなかったので良しとしました。
スレッドによる実装
イベントループとして記述するため少しトリッキーな実装になりました。
impl Aizuna {
pub fn spawn(&mut self) -> Result<()> {
info!("Aizuna: Thread");
let (res_rec, handles) = {
// このブロックはとても重要です
// res_sen(送信チャンネル)の参照数をスレッドの数と等しくする必要があるからです
// ブロックに包まなかった場合、res_senのライフタイムが伸びて
// res_rec.recv_timeoutがDisconnected(全スレッドの終了)を返さなくなります
// responce (= Message) を送ってもらうためのチャンネルを作成します
let (res_sen, res_rec) = ::std::sync::mpsc::channel();
// 複数のスレッドを起動し、handlesに収集します
let mut handles = Vec::default();
for x in self.connectors.iter() {
// 初回のCommand::Listenはx.spawnにより、自動で送出されます
handles.push(x.spawn(res_sen.clone())?); // res_senを渡します
}
(res_rec, handles)
};
// ====================================================================
/// Message受け取り時に起こりうるエラーを列挙します
enum RecvErr {
Disconnected,
Timeout,
Quit,
SendError(::std::sync::mpsc::SendError<Command>),
}
// --------------------------------------------------------------------
/// ::std::sync::mpsc::SendErrorからRecvErrに自動変換できるようにしておきます
impl From<::std::sync::mpsc::SendError<Command>> for RecvErr {
fn from(e: ::std::sync::mpsc::SendError<Command>) -> Self {
RecvErr::SendError(e)
}
}
// ====================================================================
/// コマンド送信 -> メッセージ受け取り
fn recv(aizuna: &mut Aizuna, res_rec: &ResRec) -> ::std::result::Result<(), RecvErr> {
debug!("Aizuna: Recv");
// 2000ms メッセージを待つ
match res_rec.recv_timeout(Duration::from_millis(2000)) {
Err(RecvTimeoutError::Disconnected) => {
// 全スレッドが終了
Err(RecvErr::Disconnected)
}
Err(RecvTimeoutError::Timeout) => {
// タイムアウト
Err(RecvErr::Timeout)
}
Ok((_, None)) => {
// チャットサービスとの接続が終了
Err(RecvErr::Quit)
}
Ok((Responce::Error(ref x), Some(ref cmd_sen))) => {
// チャットサービス側でエラー
eprintln!("Aizuna::spawn: {:?}", x);
Ok(cmd_sen.send(Command::Quit(None))?)
}
Ok((Responce::Yield, Some(ref cmd_sen))) => {
// メッセージなし これはコルーチンモードのための応答です
debug!("Responce::Yield");
Ok(cmd_sen.send(Command::Listen)?) // 次のメッセージを要求
}
Ok((Responce::Message(ref message), Some(ref cmd_sen))) => {
// メッセージ受信
debug!("Responce::Message({:?})", message);
match aizuna.on_message(message) {
Err(x) => {
// メッセージ処理時にエラー ユーザに表示する
eprintln!("Aizuna::on_message: Error {:?}", x);
Ok(cmd_sen.send(Command::Send(
message.clone(),
String::from("Inner error occured."),
))?)
}
Ok(None) => {
// メッセージ処理完遂 次のメッセージを要求
Ok(cmd_sen.send(Command::Listen)?)
}
Ok(Some(cmd)) => {
// メッセージ処理継続 コマンド送出
Ok(cmd_sen.send(cmd)?)
}
}
}
}
}
// イベントループ
loop {
match recv(self, &res_rec) {
Err(RecvErr::Disconnected) => {
// 全スレッドが終了
break;
}
Err(RecvErr::Timeout) => {
// タイムアウトしたなら余力ありとみなし、データベースをフラッシュする
debug!("Aizuna: DB.flush");
let _ = self.db.flush()?;
}
Err(RecvErr::SendError(x)) => {
// コマンド送信時にエラー発生 報告してリトライ
eprintln!("Aizuna::spawn: SendError: {:?}", x)
}
Err(RecvErr::Quit) => {
// チャットサービスの1つが終了
debug!("Responce::Quit");
}
Ok(_) => {
// 正常動作 継続
}
}
}
// スレッドハンドルにjoinして全てのスレッドの終了を確認します
for x in handles {
match x.join() {
Err(x) => eprintln!("Aizuna: spawn: join: {:?}", x),
Ok(Err(x)) => eprintln!("Aizuna: spawn: join: {:?}", x),
Ok(Ok(_)) => {}
}
}
info!("Aizuna: Stop");
Ok(())
}
}
注意が必要なのは、レスポンス用のチャンネルが 「レスポンスとコマンド送出用のチャンネルのタプル」を返してくるところです。
// ============================================================================
pub type CmdSen = ::std::sync::mpsc::Sender<Command>;
pub type CmdRec = ::std::sync::mpsc::Receiver<Command>;
// ----------------------------------------------------------------------------
pub type ResSen = ::std::sync::mpsc::Sender<(Responce, Option<CmdSen>)>;
pub type ResRec = ::std::sync::mpsc::Receiver<(Responce, Option<CmdSen>)>;
// ============================================================================
match res_rec.recv_timeout(Duration::from_millis(2000)) {
Ok((Responce::Message(ref message), Some(ref cmd_sen))) => ...
...
}
このcmd_senがどのスレッド(=チャットサービス)がレスポンスを送ってきたのかという情報を含んでいます。 実装ではこれに依存しないように設計してあるため、 どのスレッドがレスポンスを返しても同じ処理で済むようになっています。
ドキュメント制作
Aizunaのドキュメントはmdbookを利用して作成しました。 Aizunaのソースコードのmdbookフォルダにソースが入っています。
GitHubのpagesの機能でgithub.ioに公開するため下記のコマンドでdocsフォルダに生成しています。
おわりに
チャットサービス上でサイコロを振れるチャットボットAizunaを紹介しました。
その実、「Rustでコルーチンが使いたくてやってみました。結局スレッドでも書きました」 という在り様な散文であります。
もし本記事を根気よく読んでくださった方がいらっしゃいましたら、うれしいかぎりです。 どうもありがとうございました。
nice.
返信削除