Есть ли поток FIFO в Scala?

Я ищу поток FIFO в Scala, то есть что-то, что обеспечивает функциональность

  • неизменяемого .Stream (поток, который может быть конечным и запоминать элементы, которые уже были прочитаны)
  • mutable.Queue (который позволяет добавлять элементы в FIFO)

Поток должен быть закрываемым и должен блокировать доступ к следующему элементу до тех пор, пока элемент не будет добавлен или поток не будет закрыт.

На самом деле я немного удивлен, что библиотека коллекций (кажется) не включает такую ​​структуру данных, поскольку это довольно классическая структура IMO.

Мои вопросы:

  • 1) Я что-то упустил? Существует ли уже класс, обеспечивающий эту функциональность?

  • 2) Хорошо, если он не включен в библиотеку коллекций, это может быть просто тривиальная комбинация существующих классов коллекции. Однако я попытался найти этот тривиальный код, но моя реализация все еще выглядит довольно сложной для такой простой проблемы.Есть ли более простое решение для такого FifoStream?

     class FifoStream [T] расширяет Closeable {
     
    val queue = new Queue [Option [T]] 
     {{1} }} lazy val stream = nextStreamElem 
     
    private def nextStreamElem: Stream [T] = next () match {
    case Some (elem) => Stream.cons (elem, nextStreamElem ) 
    case None => Stream.empty 
    } 
     
     / ** Возвращает следующий элемент в очереди (может дождаться его вставки). * / 
    private def next () = {
    queue.synchronized {
    if (queue.isEmpty) queue.wait () 
    queue.dequeue () { {1}}} 
    } 
     
     / ** Добавляет новые элементы в этот поток. * / 
    def enqueue (elems: T *) {
    queue.synchronized {
    queue.enqueue (elems.map {Some (_)}: _ *) {{1 }} queue.notify () 
    } 
    } 
     
     / ** Закрывает этот поток. * / 
    def close () {
    queue.synchronized {
    queue.enqueue (None) 
    queue.notify () 
    } {{ 1}}} 
    } 
     

Решение Paradigmatic (немного изменено)

Спасибо за ваши предложения. Я немного изменил решение paradigmatic, чтобы toStream возвращал неизменяемый поток (допускает повторяющиеся чтения), чтобы он соответствовал моим потребностям. Вот код для полноты:

import collection.JavaConversions._
import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}

class FIFOStream[A]( private val queue: BlockingQueue[Option[A]] = new LinkedBlockingQueue[Option[A]]() ) {
  lazy val toStream: Stream[A] = queue2stream
  private def queue2stream: Stream[A] = queue take match {
    case Some(a) => Stream cons ( a, queue2stream )
    case None    => Stream empty
  }
  def close() = queue add None
  def enqueue( as: A* ) = queue addAll as.map( Some(_) )
}

14
задан Stefan Endrullis 27 September 2011 в 12:52
поделиться