diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index f462a26..c3cc858 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -77,7 +77,7 @@ import Hash2Pub.ASN1Coding import Hash2Pub.FediChordTypes (CacheEntry (..), CacheEntry (..), FediChordConf (..), - HasKeyID (..), + HasKeyID (..), LoadStats (..), LocalNodeState (..), LocalNodeStateSTM, NodeCache, NodeID, NodeState (..), @@ -95,6 +95,7 @@ import Hash2Pub.FediChordTypes (CacheEntry (..), rMapLookupSucc, setPredecessors, setSuccessors) import Hash2Pub.ProtocolTypes +import Hash2Pub.RingMap import Debug.Trace (trace) @@ -253,6 +254,15 @@ ackRequest req@Request{} = serialiseMessage sendMessageSize $ Response { ackRequest _ = Map.empty +-- | extract the first payload from a received message set +extractFirstPayload :: Set.Set FediChordMessage -> Maybe ActionPayload +extractFirstPayload msgSet = foldr' (\msg plAcc -> + if isNothing plAcc && isJust (payload msg) + then payload msg + else plAcc + ) Nothing msgSet + + -- | Dispatch incoming requests to the dedicated handling and response function, and enqueue -- the response to be sent. handleIncomingRequest :: Service s (RealNodeSTM s) @@ -282,6 +292,7 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do -- only when joined Leave -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondLeave else pure Nothing Stabilise -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondStabilise else pure Nothing + QueryLoad -> if isJoined ns then Just <$> respondLoadQuery nsSTM msgSet else pure Nothing ) -- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1. @@ -321,12 +332,8 @@ respondQueryID nsSTM msgSet = do let aRequestPart = Set.elemAt 0 msgSet senderID = getNid . sender $ aRequestPart - senderPayload = foldr' (\msg plAcc -> - if isNothing plAcc && isJust (payload msg) - then payload msg - else plAcc - ) Nothing msgSet - -- return only empty message serialisation if no payload was included in parts + senderPayload = extractFirstPayload msgSet + -- return only empty message serialisation if no payload was included in parts maybe (pure Map.empty) (\senderPayload' -> do responseMsg <- atomically $ do nsSnap <- readTVar nsSTM @@ -426,6 +433,43 @@ respondPing nsSTM msgSet = do } pure $ serialiseMessage sendMessageSize pingResponse +respondLoadQuery :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) +respondLoadQuery nsSTM msgSet = do + nsSnap <- readTVarIO nsSTM + -- this message cannot be split reasonably, so just + -- consider the first payload + let + aRequestPart = Set.elemAt 0 msgSet + senderPayload = extractFirstPayload msgSet + responsePl <- maybe (pure Nothing) (\pl -> + case pl of + LoadRequestPayload{} -> do + serv <- nodeService <$> readTVarIO (parentRealNode nsSnap) + lStats <- getServiceLoadStats serv + let + directSucc = getNid . head . predecessors $ nsSnap + handledTagSum = sum . takeRMapSuccessorsFromTo directSucc (loadSegmentUpperBound pl) $ loadPerTag lStats + pure $ Just LoadResponsePayload + { loadSum = handledTagSum + , loadRemainingTarget = remainingLoadTarget lStats + , loadSegmentLowerBound = directSucc + } + _ -> pure Nothing + ) + senderPayload + + pure $ maybe + Map.empty + (\pl -> serialiseMessage sendMessageSize $ Response + { requestID = requestID aRequestPart + , senderID = getNid nsSnap + , part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1 + , isFinalPart = False + , action = QueryLoad + , payload = Just pl + } + ) responsePl + respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondJoin nsSTM msgSet = do @@ -759,14 +803,7 @@ requestQueryLoad ns upperIdBound target = do responseMsgSet <- liftEither responses -- throws an error if an exception happened loadResPl <- maybe (throwError "no load response payload found") pure - (foldr' (\msg acc -> case payload msg of - -- just extract the first found LoadResponsePayload - Just pl@LoadResponsePayload{} | isNothing acc -> Just pl - _ -> Nothing - ) - Nothing - responseMsgSet - ) + (extractFirstPayload responseMsgSet) pure SegmentLoadStats { segmentLowerKeyBound = loadSegmentLowerBound loadResPl , segmentUpperKeyBound = upperIdBound