У нас есть некоторый код, который должен работать быстрее. Его уже представленный, таким образом, мы хотели бы использовать несколько потоков. Обычно я устанавливал бы в очереди памяти и имел бы много потоков, устраивающихся на работу очереди и вычисляющих результаты. Для совместно используемых данных я использовал бы ConcurrentHashMap или подобный.
Я действительно не хочу спускаться по тому маршруту снова. Из того, что я читал, агенты использования приведут к более чистому коду и если я использую akka, мигрирующий больше чем на 1 jvm, должно быть легче. Это верно?
Однако я не знаю, как думать в агентах, таким образом, я не уверен, где запустить.
Для давания лучшее представление о проблеме вот, некоторый пример кода:
case class Trade(price:Double, volume:Int, stock:String) {
def value(priceCalculator:PriceCalculator) =
(priceCalculator.priceFor(stock)-> price)*volume
}
class PriceCalculator {
def priceFor(stock:String) = {
Thread.sleep(20)//a slow operation which can be cached
50.0
}
}
object ValueTrades {
def valueAll(trades:List[Trade],
priceCalculator:PriceCalculator):List[(Trade,Double)] = {
trades.map { trade => (trade,trade.value(priceCalculator)) }
}
def main(args:Array[String]) {
val trades = List(
Trade(30.5, 10, "Foo"),
Trade(30.5, 20, "Foo")
//usually much longer
)
val priceCalculator = new PriceCalculator
val values = valueAll(trades, priceCalculator)
}
}
Я ценил бы его, если бы кто-то с опытом с помощью агентов мог бы предложить, как это отобразилось бы на агентах.
Это дополнение к моему комментарию к общим результатам для дорогостоящих вычислений. Вот он:
import scala.actors._
import Actor._
import Futures._
case class PriceFor(stock: String) // Ask for result
// The following could be an "object" as well, if it's supposed to be singleton
class PriceCalculator extends Actor {
val map = new scala.collection.mutable.HashMap[String, Future[Double]]()
def act = loop {
react {
case PriceFor(stock) => reply(map getOrElseUpdate (stock, future {
Thread.sleep(2000) // a slow operation
50.0
}))
}
}
}
Вот пример использования:
scala> val pc = new PriceCalculator; pc.start
pc: PriceCalculator = PriceCalculator@141fe06
scala> class Test(stock: String) extends Actor {
| def act = {
| println(System.currentTimeMillis().toString+": Asking for stock "+stock)
| val f = (pc !? PriceFor(stock)).asInstanceOf[Future[Double]]
| println(System.currentTimeMillis().toString+": Got the future back")
| val res = f.apply() // this blocks until the result is ready
| println(System.currentTimeMillis().toString+": Value: "+res)
| }
| }
defined class Test
scala> List("abc", "def", "abc").map(new Test(_)).map(_.start)
1269310737461: Asking for stock abc
res37: List[scala.actors.Actor] = List(Test@6d888e, Test@1203c7f, Test@163d118)
1269310737461: Asking for stock abc
1269310737461: Asking for stock def
1269310737464: Got the future back
scala> 1269310737462: Got the future back
1269310737465: Got the future back
1269310739462: Value: 50.0
1269310739462: Value: 50.0
1269310739465: Value: 50.0
scala> new Test("abc").start // Should return instantly
1269310755364: Asking for stock abc
res38: scala.actors.Actor = Test@15b5b68
1269310755365: Got the future back
scala> 1269310755367: Value: 50.0
Для простого распараллеливания, когда я бросаю кучу работы на обработку, а затем жду, пока все это вернется, я предпочитаю использовать Futures шаблон.
class ActorExample {
import actors._
import Actor._
class Worker(val id: Int) extends Actor {
def busywork(i0: Int, i1: Int) = {
var sum,i = i0
while (i < i1) {
i += 1
sum += 42*i
}
sum
}
def act() { loop { react {
case (i0:Int,i1:Int) => sender ! busywork(i0,i1)
case None => exit()
}}}
}
val workforce = (1 to 4).map(i => new Worker(i)).toList
def parallelFourSums = {
workforce.foreach(_.start())
val futures = workforce.map(w => w !! ((w.id,1000000000)) );
val computed = futures.map(f => f() match {
case i:Int => i
case _ => throw new IllegalArgumentException("I wanted an int!")
})
workforce.foreach(_ ! None)
computed
}
def serialFourSums = {
val solo = workforce.head
workforce.map(w => solo.busywork(w.id,1000000000))
}
def timed(f: => List[Int]) = {
val t0 = System.nanoTime
val result = f
val t1 = System.nanoTime
(result, t1-t0)
}
def go {
val serial = timed( serialFourSums )
val parallel = timed( parallelFourSums )
println("Serial result: " + serial._1)
println("Parallel result:" + parallel._1)
printf("Serial took %.3f seconds\n",serial._2*1e-9)
printf("Parallel took %.3f seconds\n",parallel._2*1e-9)
}
}
По сути, идея состоит в том, чтобы создать коллекцию воркеров - по одному на рабочую нагрузку - а затем передать им все данные с помощью !! который немедленно возвращает будущее. Когда вы пытаетесь прочитать будущее, отправитель блокируется, пока работник не закончит работу с данными.
Вы можете переписать приведенное выше так, чтобы PriceCalculator
вместо этого расширял Actor
, а valueAll
координировал возврат данных.
Обратите внимание, что вы должны быть осторожны при передаче неизменяемых данных.
В любом случае, на машине, с которой я набираю это, если вы запустите вышеуказанное, вы получите:
scala> (new ActorExample).go
Serial result: List(-1629056553, -1629056636, -1629056761, -1629056928)
Parallel result:List(-1629056553, -1629056636, -1629056761, -1629056928)
Serial took 1.532 seconds
Parallel took 0.443 seconds
(Очевидно, у меня как минимум четыре ядра; время параллельной обработки довольно сильно варьируется в зависимости от того, какой работник получает какой процессор и что еще происходит на машине.)