adjust rest of code to new message structure
This commit is contained in:
parent
f8d444d5b6
commit
2542091379
|
@ -177,7 +177,7 @@ requestJoin :: NodeState a => a -- ^ currently responsible node to b
|
|||
-> IO (Either String LocalNodeState) -- ^ node after join with all its new information
|
||||
requestJoin toJoinOn ownState =
|
||||
bracket (mkSendSocket (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do
|
||||
responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 1 Join (Just JoinRequestPayload)) sock
|
||||
responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock
|
||||
joinedStateUnsorted <- foldM
|
||||
(\nsAcc msg -> case payload msg of
|
||||
Nothing -> pure nsAcc
|
||||
|
@ -259,7 +259,7 @@ sendQueryIdMessage :: NodeID -- ^ target key ID to look u
|
|||
-> IO (Set.Set FediChordMessage) -- ^ responses
|
||||
sendQueryIdMessage targetID ns = sendRequestTo 5000 3 (lookupMessage targetID ns)
|
||||
where
|
||||
lookupMessage targetID ns rID = Request rID (toRemoteNodeState ns) 1 1 QueryID (Just $ pl ns targetID)
|
||||
lookupMessage targetID ns rID = Request rID (toRemoteNodeState ns) 1 True QueryID (Just $ pl ns targetID)
|
||||
pl ns' targetID' = QueryIDRequestPayload { queryTargetID = targetID', queryLBestNodes = fromIntegral . lNumBestNodes $ ns }
|
||||
|
||||
-- | Generic function for sending a request over a connected socket and collecting the response.
|
||||
|
@ -281,7 +281,6 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
|
|||
recvdParts <- atomically $ flushTBQueue responseQ
|
||||
pure $ Set.fromList recvdParts
|
||||
where
|
||||
-- state reingeben: state = noch nicht geackte messages, result = responses
|
||||
sendAndAck :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
|
||||
-> Socket -- ^ the socket used for sending and receiving for this particular remote node
|
||||
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
|
||||
|
@ -289,27 +288,28 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
|
|||
sendAndAck responseQueue sock remainingSends = do
|
||||
sendMany sock $ Map.elems remainingSends
|
||||
-- if all requests have been acked/ responded to, return prematurely
|
||||
recvLoop responseQueue remainingSends Set.empty
|
||||
recvLoop responseQueue remainingSends Set.empty Nothing
|
||||
recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses
|
||||
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
|
||||
-> Set.Set Integer -- ^ already received response part numbers
|
||||
-> Maybe Integer -- ^ total number of response parts if already known
|
||||
-> IO ()
|
||||
recvLoop responseQueue remainingSends' receivedPartNums = do
|
||||
recvLoop responseQueue remainingSends' receivedPartNums totalParts = do
|
||||
-- 65535 is maximum length of UDP packets, as long as
|
||||
-- no IPv6 jumbograms are used
|
||||
response <- deserialiseMessage <$> recv sock 65535
|
||||
case response of
|
||||
-- drop errors
|
||||
Left _ -> recvLoop responseQueue remainingSends' receivedPartNums
|
||||
Right msg -> do
|
||||
Right msg@Response{} -> do
|
||||
atomically $ writeTBQueue responseQueue msg
|
||||
let
|
||||
newTotalParts = if isFinalPart msg then Just (part msg) else totalParts
|
||||
newRemaining = Map.delete (part msg) remainingSends'
|
||||
newReceivedParts = Set.insert (part msg) receivedPartNums
|
||||
-- ToDo: handle responses with more parts than the request
|
||||
if Map.null newRemaining && Set.size receivedPartNums == fromIntegral (parts msg)
|
||||
if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts
|
||||
then pure ()
|
||||
else recvLoop responseQueue newRemaining receivedPartNums
|
||||
else recvLoop responseQueue newRemaining receivedPartNums newTotalParts
|
||||
-- drop errors and invalid messages
|
||||
Left _ -> recvLoop responseQueue remainingSends' receivedPartNums totalParts
|
||||
|
||||
|
||||
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
|
||||
|
|
|
@ -196,16 +196,16 @@ spec = do
|
|||
requestTemplate = Request {
|
||||
requestID = 2342
|
||||
, sender = exampleNodeState
|
||||
, parts = 1
|
||||
, part = 1
|
||||
, isFinalPart = True
|
||||
, action = undefined
|
||||
, payload = undefined
|
||||
}
|
||||
responseTemplate = Response {
|
||||
responseTo = 2342
|
||||
, senderID = nid exampleNodeState
|
||||
, parts = 1
|
||||
, part = 1
|
||||
, isFinalPart = True
|
||||
, action = undefined
|
||||
, payload = undefined
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue