コミケ告知

サークル networkmaniacs.net(旧:浜風もっこす) 2017.02.17 デブサミ内 DevBooksに出展します。
詳細は circle タグの記事へ。
2013年2月9日土曜日

RxJavaをScala(+SBT)から試してみる

# はてなダイアリーから移動した記事です。あまり真面目に整形していません。

C#のReactive ExtensionsのJava版*1であるRxJavaが、NetflixからApache License Version 2.0で公開されていました。

Rxについては

SBTのコンフィグレーション

rxjava-scalaへの依存性を書けばOK。依存しているrxjava-coreも取ってきてくれるので。

libraryDependencies += "com.netflix.rxjava" % "rxjava-scala" % "0.5.+"

最初見たときは0.5.0でしたが、今見たら0.5.1でした。

動作確認

ScalaAdaptorでテストに使われているサンプルほぼそのまま*2。くだらないけど、これが動かなければ設定がおかしいので、まずはここから。

    Observable.toObservable("1", "2", "3").take(1).subscribe(Map(
      "onNext" -> ((callback: String) => {
        println("testTake: callback = " + callback)
      })
    ))
run
[info] Running info.moccos.Main
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
testTake: callback = 1

動いているようです。
SLF4Jが邪魔ですが、ログは今は出なくてもいいので見なかったことに。(以後のログでも省きます)

java.lang.RuntimeException: Unsupported closure type というエラー

サポートしていない型をObservable.toObservableに渡すと、そんな型知らんとRuntimeExceptionを飛ばされます。最初、rxjava-core(ScalaのAdaptorがない)を引っ張ってきていたときに遭遇しました。

もう少し使ってみる

    val xs = 1 to 10
    Observable.toObservable(xs)
      .take(3)
      .subscribe(Map(
        "onNext" -> ((x: Int) => {
          println("testTake: callback = " + x)
        })
      ))
[info] Running info.moccos.Main
[error] (run-main) java.lang.ClassCastException
java.lang.ClassCastException
[trace] Stack trace suppressed: run last compile:run for the full output.
java.lang.RuntimeException: Nonzero exit code: 1
        at scala.sys.package$.error(package.scala:27)
[trace] Stack trace suppressed: run last compile:run for the full output.
[error] (compile:run) Nonzero exit code: 1

ぬ?
ここでonNextで拾う型をとりあえずAnyにしてみると…

testTake: callback = Range(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)

お、おう。テストコードにあったようなtupleだと、ScalaAdapterが直接変換してくれて上手くいきますが、コレクションの変換はまだ実装されていないみたいです。明示的にJavaの理解できる型を渡してあげないと。

    val xs: java.util.List[Int] = 1 to 10

または

    Observable.toObservable[Int](xs)
[info] Running info.moccos.Main
testTake: 1
testTake: 2
testTake: 3

これはOK。ちなみにStreamの無限数列を渡したら、無限に数えます。push型=値を生成できる限りどんどんpushするのだから、そりゃ当然か。

使える操作

rx-coreのpackage rx.operatorsを見ればわかりそうです。

参考までに本家Rxのものは…もう2年半前で古い記事ですがこれを。

合成したり取得する値を選んだりという基本中の基本だけ備わっている状態です。それから、onCompletedって書いても動いてくれないみたい。まだ趣味プロジェクトの範囲内かなあ。時間系が入ってくると面白いんですが。
pull requestにあるように、Netflixの狙い通り(?)にいくつか飛んできているので、興味を持った人たちによってじわじわと増えるのでしょう。*3

雑感

生まれたてです。既に実戦投入されたものを公開したわけではないので、これからみんなでがんばろうぜ!と煽っている段階ですね。
Scalaだと、Akka発のFutureが強力だし、元々マッチングも手軽に色々できるので、rxjavaすげー!世界が変わる!ということにはならないかもしれませんが、今後どうなるのか楽しみではあります。

*1:JVM版と呼ぶべきかどうか

*2:テスト文取って改行を足した。

*3:お前も参加しろと言われると、力不足ですまんと答えるほかない…

0 件のコメント:

コメントを投稿