Apache MINAで、バイナリ生データをビジネスロジックレベルのオブジェクト(メッセージ)に変換するのに使うのがFilterChainに組み込めるProtocolDecoder/ProtocolEncoderインターフェイス系のクラスです。ProtocolCodecFactoryを介して登録することで、上記処理を挟むことができます。
この時少し厄介なのが、受信データを扱うDecoder系です。次の処理を実装する必要があります。画像ファイルを添付したときのPOSTデータを変換するケースを想像してみて下さい。
ProtocolDecoderインターフェイスには、上記の処理をより実装しやすくした派生クラスが存在しますが、ドキュメントやexampleではどのようなシーンでどれを使うべきかの指針や、裏側の仕組みの解説が散逸していてまとまっていません。
本記事ではその辺りを覚え書き程度に書き付けておきます。
なお対象は Apache MINA 1.1.7 です。
ProtocolDecoder : 一番単純で、ベースとなるインターフェイス | | | +-> ProtocolDecoderAdapter : finishDecode(), dispose()を空で実装して簡略化したabstractクラス V CumulativeProtocolDecoder : doDecode()実装するだけでオッケーなabstractクラス | V demux.DemuxingProtocolCodecFactory系 : 複数のDecoder/Encoderを混在させたい場合に使う。 MessageDecoder(Adapter)/MessageEncoder(Adapter)を実装してregister()する。
一番単純でベースとなるインターフェイスです。ProtocolDecoderを使うと、データを受信するたびにdecode()が呼ばれます。
ProtocolDecoderは単なるインターフェイスなので、受信データの過不足判定や、不足時に受信データを一時保存する仕組みは開発者が用意する必要があります。
実際にはこれを直接実装することは考えづらく、後述するCumulativeProtocolDecoderやDemuxingProtocolCodecFactory系を使うことになります。
変換実行メソッド | void decode(IoSession, ByteBuffer, ProtocolDecoderOutput) |
decode()実行時のByteBuffer | position = 0, limit = その時受信したデータ長 |
データ長過不足判定 | 自分で実装する必要有り |
データ長不足時の一時保存・連結 | 同上 |
単一のDecoder/Encoderペアしか使わないのであればCumulativeProtocolDecoderを使うのが便利です。
CumulativeProtocolDecoder()はProtocolDecoderインタフェースを実装し、デコード処理の本体は抽象メソッドであるdoDecode()を実装すればOKな作りになっています。
doDecode()はbooleanを返しますが、trueを返せば「メッセージ変換終了」で、falseの場合は「データが足りないか何かでメッセージに変換してない」ことを意味します。
複数回にわたる受信データの連結・確保は、裏側で処理してくれます。
変換実行メソッド | void doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput) |
doDecode()実行時のByteBuffer | position = 0, limit = 前回までのデータ長含む |
データ長過不足判定 | doDecode()がtrueを返すとメッセージ変換完了→一時保存されたバッファ削除。falseを返すとIoSessionのAttributeに保存。 |
データ長不足時の一時保存・連結 | IoSessionのAttributeに保存される。doDecode()を呼ぶ前に自動連結。 |
初回doDecode()がfalseを返すと、初回受信データはIoSessionにsetAttribute()されます。
2回目では、doDecode()が呼ばれる前に
ByteBuffer buf = ioSesison.getAttribute(BUFFER); buf.put(in_byteBuffer); buf.flip()
されます。つまり初回データの後ろに2回目受信データが連結され、flip()によりposition = 0, limit = 初回 + 2回目のデータ長となります。
その後2回目のdoDecode()が呼ばれますが、ここで注意が必要です。
2回目のdoDecode()がfalseを返した場合、 buf.comact()が走ります。
compact()は、position位置 - limitまでのデータを、position = 0にコピー(ずらす)します。
これが3回目のdoDecode()に影響します。
まず2回目のdoDecode()で、position位置が変わるケースを考えてみます。
2回目doDecode()実行前: |<- 1回目のデータ ->|<- 2回目のデータ ->| |1234567890123456789|abcdefghijklmnopqrstu| P=0 Limit 2回目doDecode()実行後: |<- 1回目のデータ ->|<- 2回目のデータ ->| |1234567890123456789|abcdefghijklmnopqrstu| P = 9 Limit buf.compact() |<- #1 ->|<- 2回目のデータ ->| |0123456789|abcdefghijklmnopqrstu| P = 0 Limit 3回目doDecode()実行前: |<- #1 ->|<- 2回目のデータ ->|<- #3 ->| |0123456789|abcdefghijklmnopqrstu|ABCDEFGH| P = 0 Limit
続いて2回目のdoDecode()でposition位置が変わらないケースを見てみます。
2回目doDecode()実行前: |<- 1回目のデータ ->|<- 2回目のデータ ->| |1234567890123456789|abcdefghijklmnopqrstu| P = 0 Limit 2回目doDecode()実行後: (position 変化無し、上と同じ) buf.compact() (position = 0なのでcompact()されても上と変わらない) 3回目doDecode()実行前: |<- 1回目のデータ ->|<- 2回目のデータ ->|<- #3 ->| |1234567890123456789|abcdefghijklmnopqrstu|ABCDEFGH| P = 0 Limit
このように、doDecode()内でのposition変化有無で、3回目のBufferByteの中身が大きく異なってしまいます。
doDecode()内でByteBufferのpositionを動かすということは、意味としては「positionを動かした分だけデータを処理したのでバッファから削除する」と考えます。buf.compact()により、動かした分のデータが消えてしまいます。
buf.compact()が走ることを知らずに、例えば先頭4バイトint型で続くbodyデータ長を示すプロトコルを処理する場合に次のように書いてしまうと、
void doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) ... { int len = in.getInt(); // 相対get()で、4byte分positionが前進する if (len >= in.hasRemaining()) { return false; } else { // 変換コード out.write(myMessage); return true; } }
2回目までで全て受信出来る場合は正常に動作します。しかし3回目以降に全て受信出来る場合は、2回目の後のbuf.compact()で先頭4バイトint型のデータ長が削られてしまいますので、doDecode()の最初のgetInt()では本来ならBODY部の最初の4バイトを、データ長として読み込んでしまうことになります。
対処として幾つか思い浮かぶのは、
などがあり得ます。ByteBufferをコピーするのは若干メモリパフォーマンスで不安があります。絶対getを使うのは安心出来る手法です。
但し複雑なプロトコルフォーマットを扱う場合は絶対getの位置指定が大変です。その場合は、doDecode()冒頭でpositionをバックアップしておき、以降は相対getを使い、必要なデータを取得し終えてから元に戻してもOKだと思います。
void doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) ... { int oldPos = in.position(); int len = in.getInt(); // 相対get()で、4byte分positionが前進する in.position(oldPos); // バックアップしておいた位置を復元 if (len >= in.hasRemaining()) { return false; } else { // 変換コード out.write(myMessage); return true; } }
ここで一旦まとめると、doDecode()内のposition変化により、3回目以降のdoDecode()でのByteBufferの内容が変わります。
「後続ボディのデータ長を含むヘッダー」+「後続ボディ」で分かれるプロトコルを処理する場合に、この点を考慮しておかないとバグの原因になりますので注意が必要です。
複数のDecoder/Encoderを組み合わせる場合はこちらになります。MessageDecoder/MessageEncoderをimplementsしたクラスを用意し、必要なだけDemuxingProtocolCodeFactoryでregister()します。
中身ですが、登録されたMessageDecoder/EncoderをProtocolDecoder/Encoderでラッピングしています。正確には、DecoderについてはCumulativeProtocolDecoderでラップしています。MessageDecoder/Encoderそれぞれを1:1にProtocolDecoder/Encoderにラップしているのではなくて、Decoder/Encoder毎にまとめて一つのProtocolDecoder/Encoderでラップします。
DemuxingProtocolCodecFactory | +-> ProtocolEncoderImpl implements ProtocolEncoder : MessageEncoderを束ねる。 +-> ProtocolDecoderImpl extends CumulativeProtocolDecoder : MessageDecoderを束ねる。
実際にdecode/encode処理でどのDecoder/Encoderが使われるのかは、次の仕組みで決定されます。
Decoder系 : decodable()がOKを返したDecoderが使われる。 Encoder系 : 送信メッセージのClassが、getMessageTypes()が返すClassと一致したものが使われる。
DemuxingProtocolCodecFactory系の実装は複雑になってしまっていますが、使う方は簡単になるように出来ています。
ByteBufferとメッセージの変換で、どうしても複数のメッセージクラスを使い分ける必要があれば、こちらを使った方が見通しが良くなると思います。
ところで、MessageDecoderはCumulativeProtocolDecoderのdoDecode()内で使われます。大雑把な構成と流れは次のようになっています。
DemuxingProtocolCodecFactory -> Inner Class, ProtocolDecoderImpl extends CumulativeProtocolDecoder -> doDecode() -> decodable()がOKを返すMessageDecoderを探す。 -> OKを返したMessageDecoderでdecode()を呼ぶ。
変換実行メソッド | decodable(), decode()の二段階 |
ByteBuffer | doDecode()に準ずる |
データ長過不足判定 | decodable(), decode()の戻り値で判別 |
データ長不足時の一時保存・連結 | doDecode()に準ずる。 |
doDecode()の中でdecodable()/decode()が呼ばれますので、ByteBufferのpositionに関しても同様の注意を払う必要があります。
但しdecodable()は例外的に、呼び出しの前後でpositionとlimitをバックアップ・復元しています。従ってdecodable()の内部で、どんなにpositionとlimitを操作しても、2回目以降や続いて呼ばれるdecode()には全く影響しません。
以下にProtocolDecoderImpleのdoDecode()のエッセンスを抽出したコードを示します。
private MessageDecoder currentDecoder; @Override protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception { if (currentDecoder == null) { // this.decodersというのがDemuxingProtocolCodecFactoryにregister() // されたMessageDecoderの一覧 MessageDecoder[] decoders = this.decoders; for (int i = decoders.length - 1; i >= 0; i--) { MessageDecoder decoder = decoders[i]; int limit = in.limit(); int pos = in.position(); MessageDecoderResult result; try { result = decoder.decodable(session, in); } finally { // decodable()中で変更されたposition/limitを戻している in.position(pos); in.limit(limit); } if (result == MessageDecoder.OK) { // OKを返すMessageDecoderが見つかれば、それを // currentDecoderにセットしてforループを抜ける currentDecoder = decoder; break; } } if (currentDecoder == null) { // Decoder is not determined yet (i.e. we need more data) return false; } } MessageDecoderResult result = currentDecoder.decode(session, in, out); if (result == MessageDecoder.OK) { currentDecoder = null; // doDecode()変換完了、trueを返す。 return true; } else if (result == MessageDecoder.NEED_DATA) { // doDecode()変換未完了、falseを返す。 return false; } else { // NOT_OK・上記以外の場合はcurrentDecoderをnullにして例外発生 } }
decodable()中でのposition/limitの変更はリセットされますので、doDecode()と同様の注意はdecode()内でのみ必要だと言うことが分かります。
Apache MINAにおけるProtocolDecoder周辺のインターフェイス・クラスの使い分けと注意点について簡単にまとめました。
ProtocolDecoder, CumulativeProtocolDecoder, DemuxingProtocolCodecFactory系は、decode処理に関してはそれほど難しい作りにはなっておらず、ソースコードも比較的読みやすい方だと思います。
もし本記事の解説で疑問点が出てきたり、あるいは他の内部実装に関連した不安事項がある場合は、ぜひ実際のソースコードに目を通してみて下さい。
コメント