From 2c98d8507da7d53a76ba7adaa782c8a7a58f5a49 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Tue, 9 Jun 2020 22:11:38 +0200 Subject: [PATCH] implement stabilise request sending and parsing contributes to #44 --- src/Hash2Pub/DHTProtocol.hs | 52 +++++++++++++++++++++++++++++++------ 1 file changed, 44 insertions(+), 8 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index f1eda71..73e564f 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -251,9 +251,9 @@ respondQueryID nsSTM msgSet = do respondLeave :: LocalNodeStateSTM -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondLeave nsSTM msgSet = do -- combine payload of all parts - let (requestSuccs, requestPreds) = foldr' (\msg (succAcc, predAcc) -> - (maybe succAcc (++ succAcc) (leaveSuccessors <$> payload msg) - ,maybe predAcc (++ predAcc) (leavePredecessors <$> payload msg)) + let (requestPreds, requestSuccs) = foldr' (\msg (predAcc, succAcc) -> + (maybe predAcc (++ predAcc) (leavePredecessors <$> payload msg) + ,maybe succAcc (++ succAcc) (leaveSuccessors <$> payload msg)) ) ([],[]) msgSet aRequestPart = Set.elemAt 0 msgSet @@ -407,13 +407,13 @@ requestJoin toJoinOn ownStateSTM = pure (cacheInsertQ, newState) -- execute the cache insertions mapM_ (\f -> f joinedState) cacheInsertQ - if responses == Set.empty - then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) + pure $ if responses == Set.empty + then Left $ "join error: got no response from " <> show (getNid toJoinOn) else if null (predecessors joinedState) && null (successors joinedState) - then pure $ Left "join error: no predecessors or successors" + then Left "join error: no predecessors or successors" -- successful join - else pure $ Right ownStateSTM - ) + else Right ownStateSTM + ) `catch` (\e -> pure . Left $ displayException (e :: IOException)) @@ -478,6 +478,42 @@ sendQueryIdMessage targetID ns = sendRequestTo 5000 3 (lookupMessage targetID ns lookupMessage targetID ns rID = Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID) pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . lNumBestNodes $ ns } + +-- | Send a stabilise request to provided 'RemoteNode' and, if successful, +-- return parsed neighbour lists +requestStabilise :: LocalNodeState -- ^ sending node + -> RemoteNodeState -- ^ neighbour node to send to + -> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node +requestStabilise ns neighbour = do + responses <- bracket (mkSendSocket (getDomain neighbour) (getDhtPort neighbour)) close (sendRequestTo 5000 3 (\rid -> + Request { + requestID = rid + , sender = toRemoteNodeState ns + , part = 1 + , isFinalPart = False + , action = Stabilise + , payload = Just StabiliseRequestPaylod + } + ) + ) `catch` (\e -> pure . Left $ displayException (e :: IOException)) + either + -- forward IO error messages + (pure . Left) + (\respSet -> do + -- fold all reply parts together + let (responsePreds, responseSuccs) = foldr' (\msg (predAcc, succAcc) -> + (maybe predAcc (++ predAcc) (stabilisePredecessors <$> payload msg) + ,maybe succAcc (++ succAcc) (stabiliseSuccessors <$> payload msg)) + ) + ([],[]) respSet + pure $ if null responsePreds && null responseSuccs + then Left "no neighbours returned" + else Right (responsePreds, responseSuccs) + ) responses + + + + -- | Generic function for sending a request over a connected socket and collecting the response. -- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout. sendRequestTo :: Int -- ^ timeout in seconds