Как добавить финализатор на TVar

Предпосылки

В ответ на вопрос я построил и загрузил ограниченный- tchan (было бы неправильно загружать версию jnb ). Если имени недостаточно, bounded-tchan (BTChan) - это канал STM с максимальной пропускной способностью (записывает блок, если канал заполнен).

Недавно я получил запрос на добавление дубликата. функция как в обычном TChan . Так начинается проблема.

Как выглядит BTChan

Ниже приведено упрощенное (и фактически нефункциональное) представление BTChan.

data BTChan a = BTChan
    { max :: Int
    , count :: TVar Int
    , channel :: TVar [(Int, a)]
    , nrDups  :: TVar Int
    }

Каждый раз, когда вы пишете на канал, вы указываете количество дубликатов ( nrDups ) в кортеже - это «счетчик отдельного элемента», который указывает, сколько читателей получили этот элемент.

Каждый читатель будет уменьшать счетчик для элемента, который он читает, затем перемещать его указатель чтения на следующий элемент в списке. Если устройство чтения уменьшает счетчик до нуля, то значение count уменьшается, чтобы должным образом отражать доступную пропускную способность на канале.

Для ясности в желаемой семантике: емкость канала указывает максимальное количество элементов в очереди на канале. Любой заданный элемент ставится в очередь до тех пор, пока читатель каждого дублирования не получит элемент. Ни один элемент не должен оставаться в очереди для дублирования с GC (это основная проблема).

Например, пусть будет три дублирования канала (c1, c2, c3) с емкостью 2, где 2 элемента были записаны в channel тогда все элементы были прочитаны из c1 и c2 . Канал все еще заполнен (0 оставшейся емкости), поскольку c3 не использовал свои копии. В любой момент времени, если все ссылки на c3 будут отброшены (так что c3 будет GCed), то емкость должна быть освобождена (восстановлена ​​до 2 в данном случае).

Вот проблема: скажем, у меня есть следующий код

c <- newBTChan 1
_ <- dupBTChan c  -- This represents what would probably be a pathological bug or terminated reader
writeBTChan c "hello"
_ <- readBTChan c

Причина, по которой BTChan выглядит так:

BTChan 1 (TVar 0) (TVar []) (TVar 1)             -->   -- newBTChan
BTChan 1 (TVar 0) (TVar []) (TVar 2)             -->   -- dupBTChan
BTChan 1 (TVar 1) (TVar [(2, "hello")]) (TVar 2) -->   -- readBTChan c
BTChan 1 (TVar 1) (TVar [(1, "hello")]) (TVar 2)       -- OH NO!

Обратите внимание на счетчик чтения для "привет" в конце по-прежнему 1 ? Это означает, что сообщение не считается пропавшим (даже если в реальной реализации оно будет скопировано), и наше счетчик никогда не будет уменьшаться. Поскольку канал заполнен (максимум 1 элемент), средства записи всегда будут блокироваться.

Я хочу, чтобы финализатор создавался каждый раз при вызове dupBTChan . Когда дублированный (или исходный) канал собирается, для всех элементов, остающихся для чтения на этом канале, будет уменьшено количество элементов, также будет уменьшена переменная nrDups . В результате будущие записи будут иметь правильный счетчик ( счетчик , который не резервирует место для переменных, не считываемых каналами GC).

Решение 1 - Ручной ресурс Управление (чего я хочу избежать)

JNB ' По этой причине s bounded-tchan фактически имеет ручное управление ресурсами. См. cancelBTChan . Я собираюсь сделать что-то более сложное, чтобы пользователь мог ошибиться (не то чтобы ручное управление во многих случаях было неправильным).

Решение 2. Используйте исключения, блокируя TVars (GHC не может этого сделать как я хочу)

ИЗМЕНИТЬ это решение, и решение 3, которое является побочным продуктом, не работает! Из-за ошибки 5055 (WONTFIX) компилятор GHC отправляет исключения в оба заблокированных потока, даже если одного достаточно (что теоретически определяется, но не практично с GHC GC).

Если все способы чтобы получить BTChan IO, мы можем forkIO поток, который читает / повторяет попытки в дополнительном (фиктивном) поле TVar, уникальном для данного BTChan . Новый поток перехватит исключение, когда все другие ссылки на TVar будут отброшены, поэтому он будет знать, когда уменьшать nrDups и счетчики отдельных элементов. Это должно работать, но вынуждает всех моих пользователей использовать ввод-вывод для получения своих BTChan s:

data BTChan = BTChan { ... as before ..., dummyTV :: TVar () }

dupBTChan :: BTChan a -> IO (BTChan a)
dupBTChan c = do
       ... as before ...
       d <- newTVarIO ()
       let chan = BTChan ... d
       forkIO $ watchChan chan
       return chan

watchBTChan :: BTChan a -> IO ()
watchBTChan b = do
    catch (atomically (readTVar (dummyTV b) >> retry)) $ \e -> do
    case fromException e of
        BlockedIndefinitelyOnSTM -> atomically $ do -- the BTChan must have gotten collected
            ls <- readTVar (channel b)
            writeTVar (channel b) (map (\(a,b) -> (a-1,b)) ls)
            readTVar (nrDup b) >>= writeTVar (nrDup b) . (-1)
        _ -> watchBTChan b

РЕДАКТИРОВАТЬ: Да, это плохой финализатор, и у меня нет особых причин избегать использования addFinalizer . Это будет то же самое решение, по-прежнему требующее использования IO afaict.

Решение 3: Более чистый API, чем решение 2, но GHC все еще не поддерживает его

Пользователи запускают поток диспетчера, вызывая initBTChanCollector , который будет контролировать набор этих фиктивных телевизоров (из решения 2) и выполнять необходимую очистку. В основном, он отправляет ввод-вывод в другой поток, который знает, что делать, через глобальный ( unsafePerformIO ed) TVar . Все работает в основном как решение 2, но создание BTChan все еще может быть STM. Отказ запустить initBTChanCollector приведет к постоянно растущему списку (утечка пространства) задач по мере выполнения процесса.

Решение 4. Никогда не разрешайте отбрасывать BTChan s

Это сродни игнорированию проблемы. Если пользователь так и не сбросил дублированный BTChan , проблема исчезнет.

Решение 5

Решение 4. Никогда не позволяйте сбрасывать BTChan s

Это похоже на игнорирование проблемы. Если пользователь так и не сбросил дублированный BTChan , проблема исчезнет.

Решение 5

Решение 4. Никогда не позволяйте сбрасывать BTChan s

Это похоже на игнорирование проблемы. Если пользователь так и не сбросил дублированный BTChan , проблема исчезнет.

Решение 5 Я вижу ответ ezyang (полностью верный и признанный), но мне бы очень хотелось сохранить текущий API только с помощью функции dup.

** Решение 6 ** Подскажите, пожалуйста, лучший вариант.

РЕДАКТИРОВАТЬ: Я реализовал решение 3 (полностью непроверенный альфа-релиз) и обработал потенциальную утечку пространства, сделав сам глобал BTChan - этот канал, вероятно, должен иметь емкость 1, поэтому забыл запустить init появляется очень быстро, но это незначительное изменение. Это работает в GHCi (7.0.3), но кажется случайным. GHC генерирует исключения для обоих заблокированных потоков (действительного, читающего BTChan, и наблюдающего потока), поэтому, если вы заблокированы при чтении BTChan, когда другой поток отбрасывает ссылку, вы умираете.

5
задан Community 23 May 2017 в 09:58
поделиться