diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index c86c0f1..37a1dea 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -22,6 +22,7 @@ module Hash2Pub.DHTProtocol , requestLeave , requestPing , requestStabilise + , requestQueryLoad , lookupMessage , sendRequestTo , queryIdLookupLoop @@ -49,7 +50,8 @@ import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TVar import Control.Exception import Control.Monad (foldM, forM, forM_, void, when) -import Control.Monad.Except (MonadError (..), runExceptT) +import Control.Monad.Except (MonadError (..), liftEither, + runExceptT) import Control.Monad.IO.Class (MonadIO (..)) import qualified Data.ByteString as BS import Data.Either (rights) @@ -81,6 +83,7 @@ import Hash2Pub.FediChordTypes (CacheEntry (..), RealNode (..), RealNodeSTM, RemoteNodeState (..), RingEntry (..), RingMap (..), + SegmentLoadStats (..), Service (..), addRMapEntry, addRMapEntryWith, cacheGetNodeStateUnvalidated, @@ -744,6 +747,53 @@ requestPing ns target = do ) responses +-- still need a particular vserver as LocalNodeState, because requests need a sender +requestQueryLoad :: (MonadError String m, MonadIO m) + => LocalNodeState s + -> NodeID + -> NodeID + -> RemoteNodeState + -> m SegmentLoadStats +requestQueryLoad ns lowerIdBound upperIdBound target = do + nodeConf <- nodeConfig <$> liftIO (readTVarIO $ parentRealNode ns) + let + srcAddr = confIP nodeConf + loadPl = LoadRequestPayload + { loadLowerBound = lowerIdBound + , loadUpperBound = upperIdBound + } + responses <- liftIO $ bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close + (fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid -> + Request { + requestID = rid + , sender = toRemoteNodeState ns + , part = 1 + , isFinalPart = False + , action = QueryLoad + , payload = Just loadPl + } + ) + ) `catch` (\e -> pure . Left $ displayException (e :: IOException)) + responseMsgSet <- liftEither responses + -- throws an error if an exception happened + loadPl <- maybe (throwError "no load response payload found") pure + (foldr' (\msg acc -> case payload msg of + -- just extract the first found LoadResponsePayload + Just pl@LoadResponsePayload{} | isNothing acc -> Just pl + _ -> Nothing + ) + Nothing + responseMsgSet + ) + pure SegmentLoadStats + { segmentLowerKeyBound = lowerIdBound + , segmentUpperKeyBound = upperIdBound + , segmentLoad = loadSum loadPl + , segmentOwnerLoadTarget = loadRemainingTarget loadPl + } + + + -- | Generic function for sending a request over a connected socket and collecting the response. -- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout. sendRequestTo :: Int -- ^ timeout in milliseconds diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index af3d285..d8bbe4c 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -20,6 +20,7 @@ module Hash2Pub.FediChordTypes , RealNodeSTM , LoadStats (..) , emptyLoadStats + , SegmentLoadStats (..) , setSuccessors , setPredecessors , NodeCache @@ -436,11 +437,25 @@ data FediChordConf = FediChordConf data LoadStats = LoadStats { loadPerTag :: RingMap NodeID Double + -- ^ map of loads for each handled tag , totalCapacity :: Double + -- ^ total designated capacity of the service , remainingLoadTarget :: Double + -- ^ current mismatch between actual load and target load/capacity } deriving (Show, Eq) +data SegmentLoadStats = SegmentLoadStats + { segmentLowerKeyBound :: NodeID + -- ^ segment start key + , segmentUpperKeyBound :: NodeID + -- ^ segment end key + , segmentLoad :: Double + -- ^ sum of load of all keys in the segment + , segmentOwnerLoadTarget :: Double + -- ^ remaining load target of the current segment handler + } + -- TODO: figure out a better way of initialising emptyLoadStats :: LoadStats emptyLoadStats = LoadStats