parent
499c90e63a
commit
1ed0281417
|
@ -77,7 +77,7 @@ import Hash2Pub.ASN1Coding
|
||||||
import Hash2Pub.FediChordTypes (CacheEntry (..),
|
import Hash2Pub.FediChordTypes (CacheEntry (..),
|
||||||
CacheEntry (..),
|
CacheEntry (..),
|
||||||
FediChordConf (..),
|
FediChordConf (..),
|
||||||
HasKeyID (..),
|
HasKeyID (..), LoadStats (..),
|
||||||
LocalNodeState (..),
|
LocalNodeState (..),
|
||||||
LocalNodeStateSTM, NodeCache,
|
LocalNodeStateSTM, NodeCache,
|
||||||
NodeID, NodeState (..),
|
NodeID, NodeState (..),
|
||||||
|
@ -95,6 +95,7 @@ import Hash2Pub.FediChordTypes (CacheEntry (..),
|
||||||
rMapLookupSucc,
|
rMapLookupSucc,
|
||||||
setPredecessors, setSuccessors)
|
setPredecessors, setSuccessors)
|
||||||
import Hash2Pub.ProtocolTypes
|
import Hash2Pub.ProtocolTypes
|
||||||
|
import Hash2Pub.RingMap
|
||||||
|
|
||||||
import Debug.Trace (trace)
|
import Debug.Trace (trace)
|
||||||
|
|
||||||
|
@ -253,6 +254,15 @@ ackRequest req@Request{} = serialiseMessage sendMessageSize $ Response {
|
||||||
ackRequest _ = Map.empty
|
ackRequest _ = Map.empty
|
||||||
|
|
||||||
|
|
||||||
|
-- | extract the first payload from a received message set
|
||||||
|
extractFirstPayload :: Set.Set FediChordMessage -> Maybe ActionPayload
|
||||||
|
extractFirstPayload msgSet = foldr' (\msg plAcc ->
|
||||||
|
if isNothing plAcc && isJust (payload msg)
|
||||||
|
then payload msg
|
||||||
|
else plAcc
|
||||||
|
) Nothing msgSet
|
||||||
|
|
||||||
|
|
||||||
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
|
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
|
||||||
-- the response to be sent.
|
-- the response to be sent.
|
||||||
handleIncomingRequest :: Service s (RealNodeSTM s)
|
handleIncomingRequest :: Service s (RealNodeSTM s)
|
||||||
|
@ -282,6 +292,7 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
|
||||||
-- only when joined
|
-- only when joined
|
||||||
Leave -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondLeave else pure Nothing
|
Leave -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondLeave else pure Nothing
|
||||||
Stabilise -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondStabilise else pure Nothing
|
Stabilise -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondStabilise else pure Nothing
|
||||||
|
QueryLoad -> if isJoined ns then Just <$> respondLoadQuery nsSTM msgSet else pure Nothing
|
||||||
)
|
)
|
||||||
-- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1.
|
-- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1.
|
||||||
|
|
||||||
|
@ -321,11 +332,7 @@ respondQueryID nsSTM msgSet = do
|
||||||
let
|
let
|
||||||
aRequestPart = Set.elemAt 0 msgSet
|
aRequestPart = Set.elemAt 0 msgSet
|
||||||
senderID = getNid . sender $ aRequestPart
|
senderID = getNid . sender $ aRequestPart
|
||||||
senderPayload = foldr' (\msg plAcc ->
|
senderPayload = extractFirstPayload msgSet
|
||||||
if isNothing plAcc && isJust (payload msg)
|
|
||||||
then payload msg
|
|
||||||
else plAcc
|
|
||||||
) Nothing msgSet
|
|
||||||
-- return only empty message serialisation if no payload was included in parts
|
-- return only empty message serialisation if no payload was included in parts
|
||||||
maybe (pure Map.empty) (\senderPayload' -> do
|
maybe (pure Map.empty) (\senderPayload' -> do
|
||||||
responseMsg <- atomically $ do
|
responseMsg <- atomically $ do
|
||||||
|
@ -426,6 +433,43 @@ respondPing nsSTM msgSet = do
|
||||||
}
|
}
|
||||||
pure $ serialiseMessage sendMessageSize pingResponse
|
pure $ serialiseMessage sendMessageSize pingResponse
|
||||||
|
|
||||||
|
respondLoadQuery :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
||||||
|
respondLoadQuery nsSTM msgSet = do
|
||||||
|
nsSnap <- readTVarIO nsSTM
|
||||||
|
-- this message cannot be split reasonably, so just
|
||||||
|
-- consider the first payload
|
||||||
|
let
|
||||||
|
aRequestPart = Set.elemAt 0 msgSet
|
||||||
|
senderPayload = extractFirstPayload msgSet
|
||||||
|
responsePl <- maybe (pure Nothing) (\pl ->
|
||||||
|
case pl of
|
||||||
|
LoadRequestPayload{} -> do
|
||||||
|
serv <- nodeService <$> readTVarIO (parentRealNode nsSnap)
|
||||||
|
lStats <- getServiceLoadStats serv
|
||||||
|
let
|
||||||
|
directSucc = getNid . head . predecessors $ nsSnap
|
||||||
|
handledTagSum = sum . takeRMapSuccessorsFromTo directSucc (loadSegmentUpperBound pl) $ loadPerTag lStats
|
||||||
|
pure $ Just LoadResponsePayload
|
||||||
|
{ loadSum = handledTagSum
|
||||||
|
, loadRemainingTarget = remainingLoadTarget lStats
|
||||||
|
, loadSegmentLowerBound = directSucc
|
||||||
|
}
|
||||||
|
_ -> pure Nothing
|
||||||
|
)
|
||||||
|
senderPayload
|
||||||
|
|
||||||
|
pure $ maybe
|
||||||
|
Map.empty
|
||||||
|
(\pl -> serialiseMessage sendMessageSize $ Response
|
||||||
|
{ requestID = requestID aRequestPart
|
||||||
|
, senderID = getNid nsSnap
|
||||||
|
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
||||||
|
, isFinalPart = False
|
||||||
|
, action = QueryLoad
|
||||||
|
, payload = Just pl
|
||||||
|
}
|
||||||
|
) responsePl
|
||||||
|
|
||||||
|
|
||||||
respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
||||||
respondJoin nsSTM msgSet = do
|
respondJoin nsSTM msgSet = do
|
||||||
|
@ -759,14 +803,7 @@ requestQueryLoad ns upperIdBound target = do
|
||||||
responseMsgSet <- liftEither responses
|
responseMsgSet <- liftEither responses
|
||||||
-- throws an error if an exception happened
|
-- throws an error if an exception happened
|
||||||
loadResPl <- maybe (throwError "no load response payload found") pure
|
loadResPl <- maybe (throwError "no load response payload found") pure
|
||||||
(foldr' (\msg acc -> case payload msg of
|
(extractFirstPayload responseMsgSet)
|
||||||
-- just extract the first found LoadResponsePayload
|
|
||||||
Just pl@LoadResponsePayload{} | isNothing acc -> Just pl
|
|
||||||
_ -> Nothing
|
|
||||||
)
|
|
||||||
Nothing
|
|
||||||
responseMsgSet
|
|
||||||
)
|
|
||||||
pure SegmentLoadStats
|
pure SegmentLoadStats
|
||||||
{ segmentLowerKeyBound = loadSegmentLowerBound loadResPl
|
{ segmentLowerKeyBound = loadSegmentLowerBound loadResPl
|
||||||
, segmentUpperKeyBound = upperIdBound
|
, segmentUpperKeyBound = upperIdBound
|
||||||
|
|
Loading…
Reference in a new issue