Я ищу поток FIFO в Scala, то есть что-то, что обеспечивает функциональность
Поток должен быть закрываемым и должен блокировать доступ к следующему элементу до тех пор, пока элемент не будет добавлен или поток не будет закрыт.
На самом деле я немного удивлен, что библиотека коллекций (кажется) не включает такую структуру данных, поскольку это довольно классическая структура IMO.
Мои вопросы:
1) Я что-то упустил? Существует ли уже класс, обеспечивающий эту функциональность?
2) Хорошо, если он не включен в библиотеку коллекций, это может быть просто тривиальная комбинация существующих классов коллекции. Однако я попытался найти этот тривиальный код, но моя реализация все еще выглядит довольно сложной для такой простой проблемы.Есть ли более простое решение для такого FifoStream?
class FifoStream [T] расширяет Closeable {
val queue = new Queue [Option [T]]
{{1} }} lazy val stream = nextStreamElem
private def nextStreamElem: Stream [T] = next () match {
case Some (elem) => Stream.cons (elem, nextStreamElem )
case None => Stream.empty
}
/ ** Возвращает следующий элемент в очереди (может дождаться его вставки). * /
private def next () = {
queue.synchronized {
if (queue.isEmpty) queue.wait ()
queue.dequeue () { {1}}}
}
/ ** Добавляет новые элементы в этот поток. * /
def enqueue (elems: T *) {
queue.synchronized {
queue.enqueue (elems.map {Some (_)}: _ *) {{1 }} queue.notify ()
}
}
/ ** Закрывает этот поток. * /
def close () {
queue.synchronized {
queue.enqueue (None)
queue.notify ()
} {{ 1}}}
}
Спасибо за ваши предложения. Я немного изменил решение paradigmatic, чтобы toStream возвращал неизменяемый поток (допускает повторяющиеся чтения), чтобы он соответствовал моим потребностям. Вот код для полноты:
import collection.JavaConversions._
import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] = new LinkedBlockingQueue[Option[A]]() ) {
lazy val toStream: Stream[A] = queue2stream
private def queue2stream: Stream[A] = queue take match {
case Some(a) => Stream cons ( a, queue2stream )
case None => Stream empty
}
def close() = queue add None
def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}