From 27e5c5f9cecab676181018a5774cbba635ed0f7f Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Wed, 27 May 2020 17:48:01 +0200 Subject: [PATCH] bracket all socket-using operations to avoid resource leakage --- src/Hash2Pub/DHTProtocol.hs | 51 +++++++++++++++++++------------------ src/Hash2Pub/FediChord.hs | 39 ++++++++++++++-------------- src/Hash2Pub/Main.hs | 1 + 3 files changed, 47 insertions(+), 44 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 588e846..1adcdc1 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -23,6 +23,7 @@ module Hash2Pub.DHTProtocol where import Control.Concurrent.Async +import Control.Exception import Control.Concurrent.STM import Control.Concurrent.STM.TBQueue import Control.Concurrent.STM.TQueue @@ -146,28 +147,29 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc requestJoin :: NodeState a => a -- ^ currently responsible node to be contacted -> LocalNodeState -- ^ joining NodeState -> IO (Maybe LocalNodeState) -- ^ node after join with all its new information -requestJoin toJoinOn ownState = do - sock <- mkSendSocket (getDomain toJoinOn) (getDhtPort toJoinOn) - responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 1 Join (Just JoinRequestPayload)) sock - joinedStateUnsorted <- foldM - (\nsAcc msg -> case payload msg of - Nothing -> pure nsAcc - Just msgPl -> do - -- add transfered cache entries to global NodeCache - queueAddEntries (joinCache msgPl) nsAcc - -- add received predecessors and successors - let - addPreds ns' = setPredecessors (foldr' (:) (predecessors ns') (joinPredecessors msgPl)) ns' - addSuccs ns' = setSuccessors (foldr' (:) (successors ns') (joinSuccessors msgPl)) ns' - pure $ addSuccs . addPreds $ nsAcc - ) - -- reset predecessors and successors - (setPredecessors [] . setSuccessors [] $ ownState) - responses - if responses == Set.empty - then pure Nothing - -- sort successors and predecessors - else pure . Just . setSuccessors (sortBy localCompare $ successors joinedStateUnsorted) . setPredecessors (sortBy localCompare $ predecessors joinedStateUnsorted) $ joinedStateUnsorted +requestJoin toJoinOn ownState = + bracket (mkSendSocket (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do + responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 1 Join (Just JoinRequestPayload)) sock + joinedStateUnsorted <- foldM + (\nsAcc msg -> case payload msg of + Nothing -> pure nsAcc + Just msgPl -> do + -- add transfered cache entries to global NodeCache + queueAddEntries (joinCache msgPl) nsAcc + -- add received predecessors and successors + let + addPreds ns' = setPredecessors (foldr' (:) (predecessors ns') (joinPredecessors msgPl)) ns' + addSuccs ns' = setSuccessors (foldr' (:) (successors ns') (joinSuccessors msgPl)) ns' + pure $ addSuccs . addPreds $ nsAcc + ) + -- reset predecessors and successors + (setPredecessors [] . setSuccessors [] $ ownState) + responses + if responses == Set.empty + then pure Nothing + -- sort successors and predecessors + else pure . Just . setSuccessors (sortBy localCompare $ successors joinedStateUnsorted) . setPredecessors (sortBy localCompare $ predecessors joinedStateUnsorted) $ joinedStateUnsorted + ) -- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID. @@ -192,10 +194,9 @@ queryIdLookupLoop cacheSnapshot ns targetID = do case localResult of FOUND thisNode -> pure thisNode FORWARD nodeSet -> do - -- create connected sockets to all query targets - sockets <- mapM (\resultNode -> mkSendSocket (getDomain resultNode) (getDhtPort resultNode)) $ remoteNode <$> Set.toList nodeSet + -- create connected sockets to all query targets and use them for request handling -- ToDo: make attempts and timeout configurable - queryThreads <- mapM (async . sendQueryIdMessage targetID ns) sockets + queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket (getDomain resultNode) (getDhtPort resultNode)) close (sendQueryIdMessage targetID ns)) $ remoteNode <$> Set.toList nodeSet -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 -- ToDo: exception handling, maybe log them responses <- (mconcat . fmap Set.elems) . rights <$> mapM waitCatch queryThreads diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 9cce7eb..60d96ee 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -117,26 +117,27 @@ fediChordBootstrapJoin :: LocalNodeState -- ^ the local 'NodeState' -> (String, PortNumber) -- ^ domain and port of a bootstrapping node -> IO (Either String LocalNodeState) -- ^ the joined 'NodeState' after a -- successful join, otherwise an error message -fediChordBootstrapJoin ns (joinHost, joinPort) = do +fediChordBootstrapJoin ns (joinHost, joinPort) = -- can be invoked multiple times with all known bootstrapping nodes until successfully joined - sock <- mkSendSocket joinHost joinPort - -- 1. get routed to placement of own ID until FOUND: - -- Initialise an empty cache only with the responses from a bootstrapping node - bootstrapResponse <- sendQueryIdMessage (getNid ns) ns sock - if bootstrapResponse == Set.empty - then pure . Left $ "Bootstrapping node " <> show joinHost <> " gave no response." - else do - now <- getPOSIXTime - -- create new cache with all returned node responses - let bootstrapCache = - -- traverse response parts - foldr' (\resp cacheAcc -> case queryResult <$> payload resp of - Nothing -> cacheAcc - Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc - Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset - ) - initCache bootstrapResponse - fediChordJoin bootstrapCache ns + bracket (mkSendSocket joinHost joinPort) close (\sock -> do + -- 1. get routed to placement of own ID until FOUND: + -- Initialise an empty cache only with the responses from a bootstrapping node + bootstrapResponse <- sendQueryIdMessage (getNid ns) ns sock + if bootstrapResponse == Set.empty + then pure . Left $ "Bootstrapping node " <> show joinHost <> " gave no response." + else do + now <- getPOSIXTime + -- create new cache with all returned node responses + let bootstrapCache = + -- traverse response parts + foldr' (\resp cacheAcc -> case queryResult <$> payload resp of + Nothing -> cacheAcc + Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc + Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset + ) + initCache bootstrapResponse + fediChordJoin bootstrapCache ns + ) -- | join a node to the DHT, using the provided cache snapshot for resolving the new -- node's position. diff --git a/src/Hash2Pub/Main.hs b/src/Hash2Pub/Main.hs index 3fa5d47..4482012 100644 --- a/src/Hash2Pub/Main.hs +++ b/src/Hash2Pub/Main.hs @@ -1,6 +1,7 @@ module Main where import Control.Concurrent +import Control.Exception import Data.IP (IPv6, toHostAddress6) import System.Environment