Как я изменяю сокет TCP, чтобы не заблокироваться?

Потребность в тайм-ауте заставит вас переписать основную часть логики lineStream. С другой стороны, с такой переписью вы можете избежать промежуточного Iterator и напрямую вставить строки в Subject. Для логики тайм-аута вы можете использовать метод Monix timeoutOnSlowUpstream, но вам все равно придется обработать ошибку тайм-аута и закрыть запущенный процесс.

Также есть выбор, что делать с длинным выходом и несколькими подписчиками. В этом коде я решил использовать ограниченный буфер replayLimited. В зависимости от ваших потребностей вы можете выбрать другую стратегию. Вот эскиз решения:

object ProcessHelper {

  import scala.sys.process.{Process, BasicIO}
  import scala.concurrent.duration.FiniteDuration
  import monix.eval.Task
  import monix.execution.Scheduler
  import monix.reactive.subjects.ConcurrentSubject
  import monix.reactive.Observable

  private class FinishedFlagWrapper(var finished: Boolean = false)

  def buildProcessLinesObservable(cmd: String, timeout: FiniteDuration, bufferLines: Int = 100)(implicit scheduler: Scheduler): Observable[String] = {
    // works both as a holder for a mutable boolean var and as a synchronization lock
    // that is required to preserve semantics of a Subject, particularly
    // that onNext is never called after onError or onComplete
    val finished = new FinishedFlagWrapper()

    // whether you want here replayLimited or some other logic depends on your needs
    val subj = ConcurrentSubject.replayLimited[String](bufferLines)

    val proc = Process(cmd).run(BasicIO(withIn = false,
      line => finished.synchronized {
        if (!finished.finished)
          subj.onNext(line)
      }, None))

    // unfortunately we have to block a whole thread just to wait for the exit code
    val exitThread = new Thread(() => {
      try {
        val exitCode = proc.exitValue()
        finished.synchronized {
          if (!finished.finished) {
            finished.finished = true
            if (exitCode != 0) {
              subj.onError(new RuntimeException(s"Process '$cmd' has exited with $exitCode."))
            }
            else {
              subj.onComplete()
            }
          }
        }
      }
      catch {
        // ignore when this is a result of our timeout
        case e: InterruptedException => if(!finished.finished) e.printStackTrace()
      }
    }, "Process-exit-wait")
    exitThread.start()

    subj.timeoutOnSlowUpstream(timeout)
      .guarantee(Task(finished.synchronized {
        if (!finished.finished) {
          finished.finished = true
          proc.destroy()
          exitThread.interrupt()
        }
      }))
  }
}

Пример использования будет выглядеть примерно так:

def test(): Unit = {
  import monix.execution.Ack._
  import monix.reactive._
  import scala.concurrent._
  import scala.concurrent.duration._
  import monix.execution.Scheduler.Implicits.global


  val linesO = ProcessHelper.buildProcessLinesObservable("python3 test.py", 5 seconds, 2) // buffer is reduced to just 2 lines just for this example 

  linesO.subscribe(new Observer[String] {
    override def onNext(s: String): Future[Ack] = {
      println(s"Received '$s'")
      Future.successful(Continue)
    }

    override def onError(ex: Throwable): Unit = println(s"Error '$ex'")

    override def onComplete(): Unit = println("Complete")
  })

  try {
    println(linesO.toListL.runSyncUnsafe())
    println(linesO.toListL.runSyncUnsafe()) // second run will show only last 2 values because of the reduced buffer size
    println("Finish success")
  }
  catch {
    case e: Throwable => println("Failed with " + e)
  }
}
34
задан Qix 10 November 2016 в 04:46
поделиться

6 ответов

What do you mean by "not always reliable"? If the system succeeds in setting your socket non non-blocking, it will be non-blocking. Socket operations will return EWOULDBLOCK if they would block need to block (e.g. if the output buffer is full and you're calling send/write too often).

This forum thread has a few good points when working with non-blocking calls.

20
ответ дан 27 November 2019 в 15:49
поделиться

Если Вы хотите измениться , сокет к не блокированию , точно принимают () к НЕ Заблокированный состояние затем

int flags=fcntl(master_socket, F_GETFL);
    fcntl(master_socket, F_SETFL,flags| O_NONBLOCK); /* Change the socket into non-blocking state  F_SETFL is a command saying set flag and flag is 0_NONBLOCK     */

if((newSocket = accept(master_socket, (struct sockaddr *) &address, &addr_size))<0){
            if(errno==EWOULDBLOCK){
                   puts("\n No clients currently available............ \n");
           }
}else{
      puts("\nClient approched............ \n");
}
2
ответ дан 27 November 2019 в 15:49
поделиться

fcntl () всегда надежно работал у меня. В любом случае, вот функция, которую я использую для включения / отключения блокировки сокета:

#include <fcntl.h>

/** Returns true on success, or false if there was an error */
bool SetSocketBlockingEnabled(int fd, bool blocking)
{
   if (fd < 0) return false;

#ifdef _WIN32
   unsigned long mode = blocking ? 0 : 1;
   return (ioctlsocket(fd, FIONBIO, &mode) == 0) ? true : false;
#else
   int flags = fcntl(fd, F_GETFL, 0);
   if (flags == -1) return false;
   flags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK);
   return (fcntl(fd, F_SETFL, flags) == 0) ? true : false;
#endif
}
84
ответ дан 27 November 2019 в 15:49
поделиться

Обычно вы можете добиться того же эффекта, используя обычную блокировку ввода-вывода и мультиплексирование нескольких операций ввода-вывода с помощью select (2) , poll (2) или некоторые другие системные вызовы, доступные в вашей системе.

См. Проблема C10K для сравнения подходов к масштабируемому мультиплексированию ввода-вывода.

2
ответ дан 27 November 2019 в 15:49
поделиться

Лучший способ настроить сокет как неблокирующий в C - использовать ioctl. Пример, когда принятый сокет настроен на неблокирование, следующий:

long on = 1L;
unsigned int len;
struct sockaddr_storage remoteAddress;
len = sizeof(remoteAddress);
int socket = accept(listenSocket, (struct sockaddr *)&remoteAddress, &len)
if (ioctl(socket, (int)FIONBIO, (char *)&on))
{
    printf("ioctl FIONBIO call failed\n");
}
1
ответ дан 27 November 2019 в 15:49
поделиться

fcntl() or ioctl() are used to set the properties for file streams. When you use this function to make a socket non-blocking, function like accept(), recv() and etc, which are blocking in nature will return error and errno would be set to EWOULDBLOCK. You can poll file descriptor sets to poll on sockets.

16
ответ дан 27 November 2019 в 15:49
поделиться
Другие вопросы по тегам:

Похожие вопросы: