diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 2a8e151..8a367f2 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -216,9 +216,21 @@ type RequestMap = Map.Map (SockAddr, Integer) RequestMapEntry data RequestMapEntry = RequestMapEntry (Set.Set FediChordMessage) (Maybe Integer) POSIXTime +-- TODO: make purge age configurable +-- | periodically clean up old request parts +purgeAge :: POSIXTime +purgeAge = 60 -- seconds + requestMapPurge :: MVar RequestMap -> IO () --- PLACEHOLDER -requestMapPurge mapVar = pure () +requestMapPurge mapVar = forever $ do + rMapState <- takeMVar mapVar + now <- getPOSIXTime + putMVar mapVar $ Map.filter (\entry@(RequestMapEntry _ _ ts) -> + now - ts < purgeAge + ) rMapState + threadDelay $ fromEnum purgeAge * 2000 + + -- | Wait for messages, deserialise them, manage parts and acknowledgement status, -- and pass them to their specific handling function. @@ -230,7 +242,9 @@ fediMessageHandler sendQ recvQ ns = do -- handling multipart messages: -- Request parts can be insert into a map (key: (sender IP against spoofing, request ID), value: timestamp + set of message parts, handle all of them when size of set == parts) before being handled. This map needs to be purged periodically by a separate thread and can be protected by an MVar for fairness. requestMap <- newMVar (Map.empty :: RequestMap) - forever $ do + -- run receive loop and requestMapPurge concurrently, so that an exception makes + -- both of them fail + concurrently_ (requestMapPurge requestMap) $ forever $ do -- wait for incoming messages (rawMsg, sourceAddr) <- atomically $ readTQueue recvQ let aMsg = deserialiseMessage rawMsg diff --git a/src/Hash2Pub/Main.hs b/src/Hash2Pub/Main.hs index a837cc5..554585f 100644 --- a/src/Hash2Pub/Main.hs +++ b/src/Hash2Pub/Main.hs @@ -42,8 +42,6 @@ main = do wait =<< async (fediMainThreads serverSock thisNode) ) joinedState - -- stop main thread from terminating during development - getChar pure ()