send load query request, parse result and represent it
- sending side of #71 - introduces SegmentLoadStats to hold the response data - contributes to #2
This commit is contained in:
parent
576ea2c3f6
commit
30bf0529ed
|
@ -22,6 +22,7 @@ module Hash2Pub.DHTProtocol
|
||||||
, requestLeave
|
, requestLeave
|
||||||
, requestPing
|
, requestPing
|
||||||
, requestStabilise
|
, requestStabilise
|
||||||
|
, requestQueryLoad
|
||||||
, lookupMessage
|
, lookupMessage
|
||||||
, sendRequestTo
|
, sendRequestTo
|
||||||
, queryIdLookupLoop
|
, queryIdLookupLoop
|
||||||
|
@ -49,7 +50,8 @@ import Control.Concurrent.STM.TQueue
|
||||||
import Control.Concurrent.STM.TVar
|
import Control.Concurrent.STM.TVar
|
||||||
import Control.Exception
|
import Control.Exception
|
||||||
import Control.Monad (foldM, forM, forM_, void, when)
|
import Control.Monad (foldM, forM, forM_, void, when)
|
||||||
import Control.Monad.Except (MonadError (..), runExceptT)
|
import Control.Monad.Except (MonadError (..), liftEither,
|
||||||
|
runExceptT)
|
||||||
import Control.Monad.IO.Class (MonadIO (..))
|
import Control.Monad.IO.Class (MonadIO (..))
|
||||||
import qualified Data.ByteString as BS
|
import qualified Data.ByteString as BS
|
||||||
import Data.Either (rights)
|
import Data.Either (rights)
|
||||||
|
@ -81,6 +83,7 @@ import Hash2Pub.FediChordTypes (CacheEntry (..),
|
||||||
RealNode (..), RealNodeSTM,
|
RealNode (..), RealNodeSTM,
|
||||||
RemoteNodeState (..),
|
RemoteNodeState (..),
|
||||||
RingEntry (..), RingMap (..),
|
RingEntry (..), RingMap (..),
|
||||||
|
SegmentLoadStats (..),
|
||||||
Service (..), addRMapEntry,
|
Service (..), addRMapEntry,
|
||||||
addRMapEntryWith,
|
addRMapEntryWith,
|
||||||
cacheGetNodeStateUnvalidated,
|
cacheGetNodeStateUnvalidated,
|
||||||
|
@ -744,6 +747,53 @@ requestPing ns target = do
|
||||||
) responses
|
) responses
|
||||||
|
|
||||||
|
|
||||||
|
-- still need a particular vserver as LocalNodeState, because requests need a sender
|
||||||
|
requestQueryLoad :: (MonadError String m, MonadIO m)
|
||||||
|
=> LocalNodeState s
|
||||||
|
-> NodeID
|
||||||
|
-> NodeID
|
||||||
|
-> RemoteNodeState
|
||||||
|
-> m SegmentLoadStats
|
||||||
|
requestQueryLoad ns lowerIdBound upperIdBound target = do
|
||||||
|
nodeConf <- nodeConfig <$> liftIO (readTVarIO $ parentRealNode ns)
|
||||||
|
let
|
||||||
|
srcAddr = confIP nodeConf
|
||||||
|
loadPl = LoadRequestPayload
|
||||||
|
{ loadLowerBound = lowerIdBound
|
||||||
|
, loadUpperBound = upperIdBound
|
||||||
|
}
|
||||||
|
responses <- liftIO $ bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close
|
||||||
|
(fmap Right . sendRequestTo (confRequestTimeout nodeConf) (confRequestRetries nodeConf) (\rid ->
|
||||||
|
Request {
|
||||||
|
requestID = rid
|
||||||
|
, sender = toRemoteNodeState ns
|
||||||
|
, part = 1
|
||||||
|
, isFinalPart = False
|
||||||
|
, action = QueryLoad
|
||||||
|
, payload = Just loadPl
|
||||||
|
}
|
||||||
|
)
|
||||||
|
) `catch` (\e -> pure . Left $ displayException (e :: IOException))
|
||||||
|
responseMsgSet <- liftEither responses
|
||||||
|
-- throws an error if an exception happened
|
||||||
|
loadPl <- maybe (throwError "no load response payload found") pure
|
||||||
|
(foldr' (\msg acc -> case payload msg of
|
||||||
|
-- just extract the first found LoadResponsePayload
|
||||||
|
Just pl@LoadResponsePayload{} | isNothing acc -> Just pl
|
||||||
|
_ -> Nothing
|
||||||
|
)
|
||||||
|
Nothing
|
||||||
|
responseMsgSet
|
||||||
|
)
|
||||||
|
pure SegmentLoadStats
|
||||||
|
{ segmentLowerKeyBound = lowerIdBound
|
||||||
|
, segmentUpperKeyBound = upperIdBound
|
||||||
|
, segmentLoad = loadSum loadPl
|
||||||
|
, segmentOwnerLoadTarget = loadRemainingTarget loadPl
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
-- | Generic function for sending a request over a connected socket and collecting the response.
|
-- | Generic function for sending a request over a connected socket and collecting the response.
|
||||||
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
|
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
|
||||||
sendRequestTo :: Int -- ^ timeout in milliseconds
|
sendRequestTo :: Int -- ^ timeout in milliseconds
|
||||||
|
|
|
@ -20,6 +20,7 @@ module Hash2Pub.FediChordTypes
|
||||||
, RealNodeSTM
|
, RealNodeSTM
|
||||||
, LoadStats (..)
|
, LoadStats (..)
|
||||||
, emptyLoadStats
|
, emptyLoadStats
|
||||||
|
, SegmentLoadStats (..)
|
||||||
, setSuccessors
|
, setSuccessors
|
||||||
, setPredecessors
|
, setPredecessors
|
||||||
, NodeCache
|
, NodeCache
|
||||||
|
@ -436,11 +437,25 @@ data FediChordConf = FediChordConf
|
||||||
|
|
||||||
data LoadStats = LoadStats
|
data LoadStats = LoadStats
|
||||||
{ loadPerTag :: RingMap NodeID Double
|
{ loadPerTag :: RingMap NodeID Double
|
||||||
|
-- ^ map of loads for each handled tag
|
||||||
, totalCapacity :: Double
|
, totalCapacity :: Double
|
||||||
|
-- ^ total designated capacity of the service
|
||||||
, remainingLoadTarget :: Double
|
, remainingLoadTarget :: Double
|
||||||
|
-- ^ current mismatch between actual load and target load/capacity
|
||||||
}
|
}
|
||||||
deriving (Show, Eq)
|
deriving (Show, Eq)
|
||||||
|
|
||||||
|
data SegmentLoadStats = SegmentLoadStats
|
||||||
|
{ segmentLowerKeyBound :: NodeID
|
||||||
|
-- ^ segment start key
|
||||||
|
, segmentUpperKeyBound :: NodeID
|
||||||
|
-- ^ segment end key
|
||||||
|
, segmentLoad :: Double
|
||||||
|
-- ^ sum of load of all keys in the segment
|
||||||
|
, segmentOwnerLoadTarget :: Double
|
||||||
|
-- ^ remaining load target of the current segment handler
|
||||||
|
}
|
||||||
|
|
||||||
-- TODO: figure out a better way of initialising
|
-- TODO: figure out a better way of initialising
|
||||||
emptyLoadStats :: LoadStats
|
emptyLoadStats :: LoadStats
|
||||||
emptyLoadStats = LoadStats
|
emptyLoadStats = LoadStats
|
||||||
|
|
Loading…
Reference in a new issue