Потребность в тайм-ауте заставит вас переписать основную часть логики lineStream
. С другой стороны, с такой переписью вы можете избежать промежуточного Iterator
и напрямую вставить строки в Subject
. Для логики тайм-аута вы можете использовать метод Monix timeoutOnSlowUpstream
, но вам все равно придется обработать ошибку тайм-аута и закрыть запущенный процесс.
Также есть выбор, что делать с длинным выходом и несколькими подписчиками. В этом коде я решил использовать ограниченный буфер replayLimited
. В зависимости от ваших потребностей вы можете выбрать другую стратегию. Вот эскиз решения:
object ProcessHelper {
import scala.sys.process.{Process, BasicIO}
import scala.concurrent.duration.FiniteDuration
import monix.eval.Task
import monix.execution.Scheduler
import monix.reactive.subjects.ConcurrentSubject
import monix.reactive.Observable
private class FinishedFlagWrapper(var finished: Boolean = false)
def buildProcessLinesObservable(cmd: String, timeout: FiniteDuration, bufferLines: Int = 100)(implicit scheduler: Scheduler): Observable[String] = {
// works both as a holder for a mutable boolean var and as a synchronization lock
// that is required to preserve semantics of a Subject, particularly
// that onNext is never called after onError or onComplete
val finished = new FinishedFlagWrapper()
// whether you want here replayLimited or some other logic depends on your needs
val subj = ConcurrentSubject.replayLimited[String](bufferLines)
val proc = Process(cmd).run(BasicIO(withIn = false,
line => finished.synchronized {
if (!finished.finished)
subj.onNext(line)
}, None))
// unfortunately we have to block a whole thread just to wait for the exit code
val exitThread = new Thread(() => {
try {
val exitCode = proc.exitValue()
finished.synchronized {
if (!finished.finished) {
finished.finished = true
if (exitCode != 0) {
subj.onError(new RuntimeException(s"Process '$cmd' has exited with $exitCode."))
}
else {
subj.onComplete()
}
}
}
}
catch {
// ignore when this is a result of our timeout
case e: InterruptedException => if(!finished.finished) e.printStackTrace()
}
}, "Process-exit-wait")
exitThread.start()
subj.timeoutOnSlowUpstream(timeout)
.guarantee(Task(finished.synchronized {
if (!finished.finished) {
finished.finished = true
proc.destroy()
exitThread.interrupt()
}
}))
}
}
Пример использования будет выглядеть примерно так:
def test(): Unit = {
import monix.execution.Ack._
import monix.reactive._
import scala.concurrent._
import scala.concurrent.duration._
import monix.execution.Scheduler.Implicits.global
val linesO = ProcessHelper.buildProcessLinesObservable("python3 test.py", 5 seconds, 2) // buffer is reduced to just 2 lines just for this example
linesO.subscribe(new Observer[String] {
override def onNext(s: String): Future[Ack] = {
println(s"Received '$s'")
Future.successful(Continue)
}
override def onError(ex: Throwable): Unit = println(s"Error '$ex'")
override def onComplete(): Unit = println("Complete")
})
try {
println(linesO.toListL.runSyncUnsafe())
println(linesO.toListL.runSyncUnsafe()) // second run will show only last 2 values because of the reduced buffer size
println("Finish success")
}
catch {
case e: Throwable => println("Failed with " + e)
}
}
What do you mean by "not always reliable"? If the system succeeds in setting your socket non non-blocking, it will be non-blocking. Socket operations will return EWOULDBLOCK
if they would block need to block (e.g. if the output buffer is full and you're calling send/write too often).
This forum thread has a few good points when working with non-blocking calls.
Если Вы хотите измениться , сокет к не блокированию , точно принимают () к НЕ Заблокированный состояние затем
int flags=fcntl(master_socket, F_GETFL);
fcntl(master_socket, F_SETFL,flags| O_NONBLOCK); /* Change the socket into non-blocking state F_SETFL is a command saying set flag and flag is 0_NONBLOCK */
if((newSocket = accept(master_socket, (struct sockaddr *) &address, &addr_size))<0){
if(errno==EWOULDBLOCK){
puts("\n No clients currently available............ \n");
}
}else{
puts("\nClient approched............ \n");
}
fcntl ()
всегда надежно работал у меня. В любом случае, вот функция, которую я использую для включения / отключения блокировки сокета:
#include <fcntl.h>
/** Returns true on success, or false if there was an error */
bool SetSocketBlockingEnabled(int fd, bool blocking)
{
if (fd < 0) return false;
#ifdef _WIN32
unsigned long mode = blocking ? 0 : 1;
return (ioctlsocket(fd, FIONBIO, &mode) == 0) ? true : false;
#else
int flags = fcntl(fd, F_GETFL, 0);
if (flags == -1) return false;
flags = blocking ? (flags & ~O_NONBLOCK) : (flags | O_NONBLOCK);
return (fcntl(fd, F_SETFL, flags) == 0) ? true : false;
#endif
}
Обычно вы можете добиться того же эффекта, используя обычную блокировку ввода-вывода и мультиплексирование нескольких операций ввода-вывода с помощью select (2)
, poll (2)
или некоторые другие системные вызовы, доступные в вашей системе.
См. Проблема C10K для сравнения подходов к масштабируемому мультиплексированию ввода-вывода.
Лучший способ настроить сокет как неблокирующий в C - использовать ioctl. Пример, когда принятый сокет настроен на неблокирование, следующий:
long on = 1L;
unsigned int len;
struct sockaddr_storage remoteAddress;
len = sizeof(remoteAddress);
int socket = accept(listenSocket, (struct sockaddr *)&remoteAddress, &len)
if (ioctl(socket, (int)FIONBIO, (char *)&on))
{
printf("ioctl FIONBIO call failed\n");
}
fcntl()
or ioctl()
are used to set the properties for file streams. When you use this function to make a socket non-blocking, function like accept()
, recv()
and etc, which are blocking in nature will return error and errno
would be set to EWOULDBLOCK
. You can poll file descriptor sets to poll on sockets.