bracket all socket-using operations to avoid resource leakage
This commit is contained in:
parent
b1c5c5e5f4
commit
27e5c5f9ce
|
@ -23,6 +23,7 @@ module Hash2Pub.DHTProtocol
|
||||||
where
|
where
|
||||||
|
|
||||||
import Control.Concurrent.Async
|
import Control.Concurrent.Async
|
||||||
|
import Control.Exception
|
||||||
import Control.Concurrent.STM
|
import Control.Concurrent.STM
|
||||||
import Control.Concurrent.STM.TBQueue
|
import Control.Concurrent.STM.TBQueue
|
||||||
import Control.Concurrent.STM.TQueue
|
import Control.Concurrent.STM.TQueue
|
||||||
|
@ -146,28 +147,29 @@ markCacheEntryAsVerified timestamp = Map.adjust adjustFunc
|
||||||
requestJoin :: NodeState a => a -- ^ currently responsible node to be contacted
|
requestJoin :: NodeState a => a -- ^ currently responsible node to be contacted
|
||||||
-> LocalNodeState -- ^ joining NodeState
|
-> LocalNodeState -- ^ joining NodeState
|
||||||
-> IO (Maybe LocalNodeState) -- ^ node after join with all its new information
|
-> IO (Maybe LocalNodeState) -- ^ node after join with all its new information
|
||||||
requestJoin toJoinOn ownState = do
|
requestJoin toJoinOn ownState =
|
||||||
sock <- mkSendSocket (getDomain toJoinOn) (getDhtPort toJoinOn)
|
bracket (mkSendSocket (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do
|
||||||
responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 1 Join (Just JoinRequestPayload)) sock
|
responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 1 Join (Just JoinRequestPayload)) sock
|
||||||
joinedStateUnsorted <- foldM
|
joinedStateUnsorted <- foldM
|
||||||
(\nsAcc msg -> case payload msg of
|
(\nsAcc msg -> case payload msg of
|
||||||
Nothing -> pure nsAcc
|
Nothing -> pure nsAcc
|
||||||
Just msgPl -> do
|
Just msgPl -> do
|
||||||
-- add transfered cache entries to global NodeCache
|
-- add transfered cache entries to global NodeCache
|
||||||
queueAddEntries (joinCache msgPl) nsAcc
|
queueAddEntries (joinCache msgPl) nsAcc
|
||||||
-- add received predecessors and successors
|
-- add received predecessors and successors
|
||||||
let
|
let
|
||||||
addPreds ns' = setPredecessors (foldr' (:) (predecessors ns') (joinPredecessors msgPl)) ns'
|
addPreds ns' = setPredecessors (foldr' (:) (predecessors ns') (joinPredecessors msgPl)) ns'
|
||||||
addSuccs ns' = setSuccessors (foldr' (:) (successors ns') (joinSuccessors msgPl)) ns'
|
addSuccs ns' = setSuccessors (foldr' (:) (successors ns') (joinSuccessors msgPl)) ns'
|
||||||
pure $ addSuccs . addPreds $ nsAcc
|
pure $ addSuccs . addPreds $ nsAcc
|
||||||
)
|
)
|
||||||
-- reset predecessors and successors
|
-- reset predecessors and successors
|
||||||
(setPredecessors [] . setSuccessors [] $ ownState)
|
(setPredecessors [] . setSuccessors [] $ ownState)
|
||||||
responses
|
responses
|
||||||
if responses == Set.empty
|
if responses == Set.empty
|
||||||
then pure Nothing
|
then pure Nothing
|
||||||
-- sort successors and predecessors
|
-- sort successors and predecessors
|
||||||
else pure . Just . setSuccessors (sortBy localCompare $ successors joinedStateUnsorted) . setPredecessors (sortBy localCompare $ predecessors joinedStateUnsorted) $ joinedStateUnsorted
|
else pure . Just . setSuccessors (sortBy localCompare $ successors joinedStateUnsorted) . setPredecessors (sortBy localCompare $ predecessors joinedStateUnsorted) $ joinedStateUnsorted
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
-- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID.
|
-- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID.
|
||||||
|
@ -192,10 +194,9 @@ queryIdLookupLoop cacheSnapshot ns targetID = do
|
||||||
case localResult of
|
case localResult of
|
||||||
FOUND thisNode -> pure thisNode
|
FOUND thisNode -> pure thisNode
|
||||||
FORWARD nodeSet -> do
|
FORWARD nodeSet -> do
|
||||||
-- create connected sockets to all query targets
|
-- create connected sockets to all query targets and use them for request handling
|
||||||
sockets <- mapM (\resultNode -> mkSendSocket (getDomain resultNode) (getDhtPort resultNode)) $ remoteNode <$> Set.toList nodeSet
|
|
||||||
-- ToDo: make attempts and timeout configurable
|
-- ToDo: make attempts and timeout configurable
|
||||||
queryThreads <- mapM (async . sendQueryIdMessage targetID ns) sockets
|
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket (getDomain resultNode) (getDhtPort resultNode)) close (sendQueryIdMessage targetID ns)) $ remoteNode <$> Set.toList nodeSet
|
||||||
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
|
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
|
||||||
-- ToDo: exception handling, maybe log them
|
-- ToDo: exception handling, maybe log them
|
||||||
responses <- (mconcat . fmap Set.elems) . rights <$> mapM waitCatch queryThreads
|
responses <- (mconcat . fmap Set.elems) . rights <$> mapM waitCatch queryThreads
|
||||||
|
|
|
@ -117,26 +117,27 @@ fediChordBootstrapJoin :: LocalNodeState -- ^ the local 'NodeState'
|
||||||
-> (String, PortNumber) -- ^ domain and port of a bootstrapping node
|
-> (String, PortNumber) -- ^ domain and port of a bootstrapping node
|
||||||
-> IO (Either String LocalNodeState) -- ^ the joined 'NodeState' after a
|
-> IO (Either String LocalNodeState) -- ^ the joined 'NodeState' after a
|
||||||
-- successful join, otherwise an error message
|
-- successful join, otherwise an error message
|
||||||
fediChordBootstrapJoin ns (joinHost, joinPort) = do
|
fediChordBootstrapJoin ns (joinHost, joinPort) =
|
||||||
-- can be invoked multiple times with all known bootstrapping nodes until successfully joined
|
-- can be invoked multiple times with all known bootstrapping nodes until successfully joined
|
||||||
sock <- mkSendSocket joinHost joinPort
|
bracket (mkSendSocket joinHost joinPort) close (\sock -> do
|
||||||
-- 1. get routed to placement of own ID until FOUND:
|
-- 1. get routed to placement of own ID until FOUND:
|
||||||
-- Initialise an empty cache only with the responses from a bootstrapping node
|
-- Initialise an empty cache only with the responses from a bootstrapping node
|
||||||
bootstrapResponse <- sendQueryIdMessage (getNid ns) ns sock
|
bootstrapResponse <- sendQueryIdMessage (getNid ns) ns sock
|
||||||
if bootstrapResponse == Set.empty
|
if bootstrapResponse == Set.empty
|
||||||
then pure . Left $ "Bootstrapping node " <> show joinHost <> " gave no response."
|
then pure . Left $ "Bootstrapping node " <> show joinHost <> " gave no response."
|
||||||
else do
|
else do
|
||||||
now <- getPOSIXTime
|
now <- getPOSIXTime
|
||||||
-- create new cache with all returned node responses
|
-- create new cache with all returned node responses
|
||||||
let bootstrapCache =
|
let bootstrapCache =
|
||||||
-- traverse response parts
|
-- traverse response parts
|
||||||
foldr' (\resp cacheAcc -> case queryResult <$> payload resp of
|
foldr' (\resp cacheAcc -> case queryResult <$> payload resp of
|
||||||
Nothing -> cacheAcc
|
Nothing -> cacheAcc
|
||||||
Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc
|
Just (FOUND result1) -> addCacheEntryPure now (RemoteCacheEntry result1 now) cacheAcc
|
||||||
Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
|
Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
|
||||||
)
|
)
|
||||||
initCache bootstrapResponse
|
initCache bootstrapResponse
|
||||||
fediChordJoin bootstrapCache ns
|
fediChordJoin bootstrapCache ns
|
||||||
|
)
|
||||||
|
|
||||||
-- | join a node to the DHT, using the provided cache snapshot for resolving the new
|
-- | join a node to the DHT, using the provided cache snapshot for resolving the new
|
||||||
-- node's position.
|
-- node's position.
|
||||||
|
|
|
@ -1,6 +1,7 @@
|
||||||
module Main where
|
module Main where
|
||||||
|
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
|
import Control.Exception
|
||||||
import Data.IP (IPv6, toHostAddress6)
|
import Data.IP (IPv6, toHostAddress6)
|
||||||
import System.Environment
|
import System.Environment
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue