parent
f15d83baff
commit
2c98d8507d
|
@ -251,9 +251,9 @@ respondQueryID nsSTM msgSet = do
|
||||||
respondLeave :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
respondLeave :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
||||||
respondLeave nsSTM msgSet = do
|
respondLeave nsSTM msgSet = do
|
||||||
-- combine payload of all parts
|
-- combine payload of all parts
|
||||||
let (requestSuccs, requestPreds) = foldr' (\msg (succAcc, predAcc) ->
|
let (requestPreds, requestSuccs) = foldr' (\msg (predAcc, succAcc) ->
|
||||||
(maybe succAcc (++ succAcc) (leaveSuccessors <$> payload msg)
|
(maybe predAcc (++ predAcc) (leavePredecessors <$> payload msg)
|
||||||
,maybe predAcc (++ predAcc) (leavePredecessors <$> payload msg))
|
,maybe succAcc (++ succAcc) (leaveSuccessors <$> payload msg))
|
||||||
)
|
)
|
||||||
([],[]) msgSet
|
([],[]) msgSet
|
||||||
aRequestPart = Set.elemAt 0 msgSet
|
aRequestPart = Set.elemAt 0 msgSet
|
||||||
|
@ -407,13 +407,13 @@ requestJoin toJoinOn ownStateSTM =
|
||||||
pure (cacheInsertQ, newState)
|
pure (cacheInsertQ, newState)
|
||||||
-- execute the cache insertions
|
-- execute the cache insertions
|
||||||
mapM_ (\f -> f joinedState) cacheInsertQ
|
mapM_ (\f -> f joinedState) cacheInsertQ
|
||||||
if responses == Set.empty
|
pure $ if responses == Set.empty
|
||||||
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
|
then Left $ "join error: got no response from " <> show (getNid toJoinOn)
|
||||||
else if null (predecessors joinedState) && null (successors joinedState)
|
else if null (predecessors joinedState) && null (successors joinedState)
|
||||||
then pure $ Left "join error: no predecessors or successors"
|
then Left "join error: no predecessors or successors"
|
||||||
-- successful join
|
-- successful join
|
||||||
else pure $ Right ownStateSTM
|
else Right ownStateSTM
|
||||||
)
|
)
|
||||||
`catch` (\e -> pure . Left $ displayException (e :: IOException))
|
`catch` (\e -> pure . Left $ displayException (e :: IOException))
|
||||||
|
|
||||||
|
|
||||||
|
@ -478,6 +478,42 @@ sendQueryIdMessage targetID ns = sendRequestTo 5000 3 (lookupMessage targetID ns
|
||||||
lookupMessage targetID ns rID = Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID)
|
lookupMessage targetID ns rID = Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID)
|
||||||
pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . lNumBestNodes $ ns }
|
pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . lNumBestNodes $ ns }
|
||||||
|
|
||||||
|
|
||||||
|
-- | Send a stabilise request to provided 'RemoteNode' and, if successful,
|
||||||
|
-- return parsed neighbour lists
|
||||||
|
requestStabilise :: LocalNodeState -- ^ sending node
|
||||||
|
-> RemoteNodeState -- ^ neighbour node to send to
|
||||||
|
-> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node
|
||||||
|
requestStabilise ns neighbour = do
|
||||||
|
responses <- bracket (mkSendSocket (getDomain neighbour) (getDhtPort neighbour)) close (sendRequestTo 5000 3 (\rid ->
|
||||||
|
Request {
|
||||||
|
requestID = rid
|
||||||
|
, sender = toRemoteNodeState ns
|
||||||
|
, part = 1
|
||||||
|
, isFinalPart = False
|
||||||
|
, action = Stabilise
|
||||||
|
, payload = Just StabiliseRequestPaylod
|
||||||
|
}
|
||||||
|
)
|
||||||
|
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
|
||||||
|
either
|
||||||
|
-- forward IO error messages
|
||||||
|
(pure . Left)
|
||||||
|
(\respSet -> do
|
||||||
|
-- fold all reply parts together
|
||||||
|
let (responsePreds, responseSuccs) = foldr' (\msg (predAcc, succAcc) ->
|
||||||
|
(maybe predAcc (++ predAcc) (stabilisePredecessors <$> payload msg)
|
||||||
|
,maybe succAcc (++ succAcc) (stabiliseSuccessors <$> payload msg))
|
||||||
|
)
|
||||||
|
([],[]) respSet
|
||||||
|
pure $ if null responsePreds && null responseSuccs
|
||||||
|
then Left "no neighbours returned"
|
||||||
|
else Right (responsePreds, responseSuccs)
|
||||||
|
) responses
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
-- | Generic function for sending a request over a connected socket and collecting the response.
|
-- | Generic function for sending a request over a connected socket and collecting the response.
|
||||||
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
|
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
|
||||||
sendRequestTo :: Int -- ^ timeout in seconds
|
sendRequestTo :: Int -- ^ timeout in seconds
|
||||||
|
|
Loading…
Reference in a new issue