Предпосылки
В ответ на вопрос я построил и загрузил ограниченный- tchan (было бы неправильно загружать версию jnb ). Если имени недостаточно, bounded-tchan (BTChan) - это канал STM с максимальной пропускной способностью (записывает блок, если канал заполнен).
Недавно я получил запрос на добавление дубликата. функция как в обычном TChan . Так начинается проблема.
Как выглядит BTChan
Ниже приведено упрощенное (и фактически нефункциональное) представление BTChan.
data BTChan a = BTChan
{ max :: Int
, count :: TVar Int
, channel :: TVar [(Int, a)]
, nrDups :: TVar Int
}
Каждый раз, когда вы пишете на канал, вы указываете количество дубликатов ( nrDups
) в кортеже - это «счетчик отдельного элемента», который указывает, сколько читателей получили этот элемент.
Каждый читатель будет уменьшать счетчик для элемента, который он читает, затем перемещать его указатель чтения на следующий элемент в списке. Если устройство чтения уменьшает счетчик до нуля, то значение count
уменьшается, чтобы должным образом отражать доступную пропускную способность на канале.
Для ясности в желаемой семантике: емкость канала указывает максимальное количество элементов в очереди на канале. Любой заданный элемент ставится в очередь до тех пор, пока читатель каждого дублирования не получит элемент. Ни один элемент не должен оставаться в очереди для дублирования с GC (это основная проблема).
Например, пусть будет три дублирования канала (c1, c2, c3) с емкостью 2, где 2 элемента были записаны в channel тогда все элементы были прочитаны из c1
и c2
. Канал все еще заполнен (0 оставшейся емкости), поскольку c3
не использовал свои копии. В любой момент времени, если все ссылки на c3
будут отброшены (так что c3
будет GCed), то емкость должна быть освобождена (восстановлена до 2 в данном случае).
Вот проблема: скажем, у меня есть следующий код
c <- newBTChan 1
_ <- dupBTChan c -- This represents what would probably be a pathological bug or terminated reader
writeBTChan c "hello"
_ <- readBTChan c
Причина, по которой BTChan выглядит так:
BTChan 1 (TVar 0) (TVar []) (TVar 1) --> -- newBTChan
BTChan 1 (TVar 0) (TVar []) (TVar 2) --> -- dupBTChan
BTChan 1 (TVar 1) (TVar [(2, "hello")]) (TVar 2) --> -- readBTChan c
BTChan 1 (TVar 1) (TVar [(1, "hello")]) (TVar 2) -- OH NO!
Обратите внимание на счетчик чтения для "привет" в конце
по-прежнему 1
? Это означает, что сообщение не считается пропавшим (даже если в реальной реализации оно будет скопировано), и наше счетчик
никогда не будет уменьшаться. Поскольку канал заполнен (максимум 1 элемент), средства записи всегда будут блокироваться.
Я хочу, чтобы финализатор создавался каждый раз при вызове dupBTChan
. Когда дублированный (или исходный) канал собирается, для всех элементов, остающихся для чтения на этом канале, будет уменьшено количество элементов, также будет уменьшена переменная nrDups
. В результате будущие записи будут иметь правильный счетчик
( счетчик
, который не резервирует место для переменных, не считываемых каналами GC).
Решение 1 - Ручной ресурс Управление (чего я хочу избежать)
JNB ' По этой причине s bounded-tchan фактически имеет ручное управление ресурсами. См. cancelBTChan
. Я собираюсь сделать что-то более сложное, чтобы пользователь мог ошибиться (не то чтобы ручное управление во многих случаях было неправильным).
Решение 2. Используйте исключения, блокируя TVars (GHC не может этого сделать как я хочу)
ИЗМЕНИТЬ это решение, и решение 3, которое является побочным продуктом, не работает! Из-за ошибки 5055 (WONTFIX) компилятор GHC отправляет исключения в оба заблокированных потока, даже если одного достаточно (что теоретически определяется, но не практично с GHC GC).
Если все способы чтобы получить BTChan
IO, мы можем forkIO
поток, который читает / повторяет попытки в дополнительном (фиктивном) поле TVar, уникальном для данного BTChan
. Новый поток перехватит исключение, когда все другие ссылки на TVar будут отброшены, поэтому он будет знать, когда уменьшать nrDups
и счетчики отдельных элементов. Это должно работать, но вынуждает всех моих пользователей использовать ввод-вывод для получения своих BTChan
s:
data BTChan = BTChan { ... as before ..., dummyTV :: TVar () }
dupBTChan :: BTChan a -> IO (BTChan a)
dupBTChan c = do
... as before ...
d <- newTVarIO ()
let chan = BTChan ... d
forkIO $ watchChan chan
return chan
watchBTChan :: BTChan a -> IO ()
watchBTChan b = do
catch (atomically (readTVar (dummyTV b) >> retry)) $ \e -> do
case fromException e of
BlockedIndefinitelyOnSTM -> atomically $ do -- the BTChan must have gotten collected
ls <- readTVar (channel b)
writeTVar (channel b) (map (\(a,b) -> (a-1,b)) ls)
readTVar (nrDup b) >>= writeTVar (nrDup b) . (-1)
_ -> watchBTChan b
РЕДАКТИРОВАТЬ: Да, это плохой финализатор, и у меня нет особых причин избегать использования addFinalizer
. Это будет то же самое решение, по-прежнему требующее использования IO afaict.
Решение 3: Более чистый API, чем решение 2, но GHC все еще не поддерживает его
Пользователи запускают поток диспетчера, вызывая initBTChanCollector
, который будет контролировать набор этих фиктивных телевизоров (из решения 2) и выполнять необходимую очистку. В основном, он отправляет ввод-вывод в другой поток, который знает, что делать, через глобальный ( unsafePerformIO
ed) TVar
. Все работает в основном как решение 2, но создание BTChan все еще может быть STM. Отказ запустить initBTChanCollector
приведет к постоянно растущему списку (утечка пространства) задач по мере выполнения процесса.
Решение 4. Никогда не разрешайте отбрасывать BTChan
s
Это сродни игнорированию проблемы. Если пользователь так и не сбросил дублированный BTChan
, проблема исчезнет.
Решение 5
Решение 4. Никогда не позволяйте сбрасывать BTChan
s
Это похоже на игнорирование проблемы. Если пользователь так и не сбросил дублированный BTChan
, проблема исчезнет.
Решение 5
Решение 4. Никогда не позволяйте сбрасывать BTChan
s
Это похоже на игнорирование проблемы. Если пользователь так и не сбросил дублированный BTChan
, проблема исчезнет.
Решение 5 Я вижу ответ ezyang (полностью верный и признанный), но мне бы очень хотелось сохранить текущий API только с помощью функции dup.
** Решение 6 ** Подскажите, пожалуйста, лучший вариант.
РЕДАКТИРОВАТЬ:
Я реализовал решение 3 (полностью непроверенный альфа-релиз) и обработал потенциальную утечку пространства, сделав сам глобал BTChan
- этот канал, вероятно, должен иметь емкость 1, поэтому забыл запустить init
появляется очень быстро, но это незначительное изменение. Это работает в GHCi (7.0.3), но кажется случайным. GHC генерирует исключения для обоих заблокированных потоков (действительного, читающего BTChan, и наблюдающего потока), поэтому, если вы заблокированы при чтении BTChan, когда другой поток отбрасывает ссылку, вы умираете.