periodically purge request parts
This commit is contained in:
parent
bcd1c34c7c
commit
88104de9bf
|
@ -216,9 +216,21 @@ type RequestMap = Map.Map (SockAddr, Integer) RequestMapEntry
|
||||||
data RequestMapEntry = RequestMapEntry (Set.Set FediChordMessage) (Maybe Integer)
|
data RequestMapEntry = RequestMapEntry (Set.Set FediChordMessage) (Maybe Integer)
|
||||||
POSIXTime
|
POSIXTime
|
||||||
|
|
||||||
|
-- TODO: make purge age configurable
|
||||||
|
-- | periodically clean up old request parts
|
||||||
|
purgeAge :: POSIXTime
|
||||||
|
purgeAge = 60 -- seconds
|
||||||
|
|
||||||
requestMapPurge :: MVar RequestMap -> IO ()
|
requestMapPurge :: MVar RequestMap -> IO ()
|
||||||
-- PLACEHOLDER
|
requestMapPurge mapVar = forever $ do
|
||||||
requestMapPurge mapVar = pure ()
|
rMapState <- takeMVar mapVar
|
||||||
|
now <- getPOSIXTime
|
||||||
|
putMVar mapVar $ Map.filter (\entry@(RequestMapEntry _ _ ts) ->
|
||||||
|
now - ts < purgeAge
|
||||||
|
) rMapState
|
||||||
|
threadDelay $ fromEnum purgeAge * 2000
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
-- | Wait for messages, deserialise them, manage parts and acknowledgement status,
|
-- | Wait for messages, deserialise them, manage parts and acknowledgement status,
|
||||||
-- and pass them to their specific handling function.
|
-- and pass them to their specific handling function.
|
||||||
|
@ -230,7 +242,9 @@ fediMessageHandler sendQ recvQ ns = do
|
||||||
-- handling multipart messages:
|
-- handling multipart messages:
|
||||||
-- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness.
|
-- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness.
|
||||||
requestMap <- newMVar (Map.empty :: RequestMap)
|
requestMap <- newMVar (Map.empty :: RequestMap)
|
||||||
forever $ do
|
-- run receive loop and requestMapPurge concurrently, so that an exception makes
|
||||||
|
-- both of them fail
|
||||||
|
concurrently_ (requestMapPurge requestMap) $ forever $ do
|
||||||
-- wait for incoming messages
|
-- wait for incoming messages
|
||||||
(rawMsg, sourceAddr) <- atomically $ readTQueue recvQ
|
(rawMsg, sourceAddr) <- atomically $ readTQueue recvQ
|
||||||
let aMsg = deserialiseMessage rawMsg
|
let aMsg = deserialiseMessage rawMsg
|
||||||
|
|
|
@ -42,8 +42,6 @@ main = do
|
||||||
wait =<< async (fediMainThreads serverSock thisNode)
|
wait =<< async (fediMainThreads serverSock thisNode)
|
||||||
)
|
)
|
||||||
joinedState
|
joinedState
|
||||||
-- stop main thread from terminating during development
|
|
||||||
getChar
|
|
||||||
pure ()
|
pure ()
|
||||||
|
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue