refactor vservers map to RingMap to be able to index it
- in preparation for periodical rebalancing - makes it possible to look up the next vserver for iterating through it, after refreshing the map in-between - added some necessary RingMap functions
This commit is contained in:
parent
bb0fb0919a
commit
5ed8a28fde
|
@ -91,10 +91,9 @@ import Hash2Pub.FediChordTypes (CacheEntry (..),
|
||||||
cacheGetNodeStateUnvalidated,
|
cacheGetNodeStateUnvalidated,
|
||||||
cacheLookup, cacheLookupPred,
|
cacheLookup, cacheLookupPred,
|
||||||
cacheLookupSucc, genNodeID,
|
cacheLookupSucc, genNodeID,
|
||||||
getKeyID, localCompare,
|
getKeyID, hasValidNodeId,
|
||||||
rMapFromList, rMapLookupPred,
|
localCompare, rMapFromList,
|
||||||
rMapLookupSucc,
|
rMapLookupPred, rMapLookupSucc,
|
||||||
hasValidNodeId,
|
|
||||||
remainingLoadTarget,
|
remainingLoadTarget,
|
||||||
setPredecessors, setSuccessors)
|
setPredecessors, setSuccessors)
|
||||||
import Hash2Pub.ProtocolTypes
|
import Hash2Pub.ProtocolTypes
|
||||||
|
|
|
@ -92,6 +92,7 @@ import System.Random (randomRIO)
|
||||||
|
|
||||||
import Hash2Pub.DHTProtocol
|
import Hash2Pub.DHTProtocol
|
||||||
import Hash2Pub.FediChordTypes
|
import Hash2Pub.FediChordTypes
|
||||||
|
import Hash2Pub.RingMap
|
||||||
import Hash2Pub.Utils
|
import Hash2Pub.Utils
|
||||||
|
|
||||||
import Debug.Trace (trace)
|
import Debug.Trace (trace)
|
||||||
|
@ -107,7 +108,7 @@ fediChordInit initConf serviceRunner = do
|
||||||
cacheSTM <- newTVarIO initCache
|
cacheSTM <- newTVarIO initCache
|
||||||
cacheQ <- atomically newTQueue
|
cacheQ <- atomically newTQueue
|
||||||
let realNode = RealNode
|
let realNode = RealNode
|
||||||
{ vservers = HMap.empty
|
{ vservers = emptyRMap
|
||||||
, nodeConfig = initConf
|
, nodeConfig = initConf
|
||||||
, bootstrapNodes = confBootstrapNodes initConf
|
, bootstrapNodes = confBootstrapNodes initConf
|
||||||
, lookupCacheSTM = emptyLookupCache
|
, lookupCacheSTM = emptyLookupCache
|
||||||
|
@ -218,10 +219,10 @@ kChoicesNodeJoin nodeSTM bootstrapNode = do
|
||||||
alreadyJoinedVss <- liftIO $ foldM (\sumAcc vsSTM -> readTVarIO vsSTM >>= (\vs -> pure . (+) sumAcc $ if vsIsJoined vs then 1 else 0)) 0 $ vservers node
|
alreadyJoinedVss <- liftIO $ foldM (\sumAcc vsSTM -> readTVarIO vsSTM >>= (\vs -> pure . (+) sumAcc $ if vsIsJoined vs then 1 else 0)) 0 $ vservers node
|
||||||
unless (alreadyJoinedVss > 0 && compensatedLoadSum ownLoadStats >= joinLoadTarget) $ do
|
unless (alreadyJoinedVss > 0 && compensatedLoadSum ownLoadStats >= joinLoadTarget) $ do
|
||||||
joinedVss <- vsJoins vs0STM (totalCapacity ownLoadStats) (vservers node) joinLoadTarget (fromIntegral initialJoins - alreadyJoinedVss) nodeSTM
|
joinedVss <- vsJoins vs0STM (totalCapacity ownLoadStats) (vservers node) joinLoadTarget (fromIntegral initialJoins - alreadyJoinedVss) nodeSTM
|
||||||
if HMap.null joinedVss
|
if nullRMap joinedVss
|
||||||
then throwError "k-choices join unsuccessful, no vserver joined"
|
then throwError "k-choices join unsuccessful, no vserver joined"
|
||||||
else liftIO . atomically . modifyTVar' nodeSTM $ \node' -> node'
|
else liftIO . atomically . modifyTVar' nodeSTM $ \node' -> node'
|
||||||
{ vservers = HMap.union (vservers node') joinedVss }
|
{ vservers = unionRMap joinedVss (vservers node') }
|
||||||
|
|
||||||
where
|
where
|
||||||
vsJoins :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s))
|
vsJoins :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s))
|
||||||
|
@ -233,7 +234,7 @@ kChoicesNodeJoin nodeSTM bootstrapNode = do
|
||||||
|
|
||||||
(acquiredLoad, (newNid, newVs)) <- kChoicesVsJoin queryVsSTM bootstrapNode capacity vsmap nodeSTM' remainingTargetLoad
|
(acquiredLoad, (newNid, newVs)) <- kChoicesVsJoin queryVsSTM bootstrapNode capacity vsmap nodeSTM' remainingTargetLoad
|
||||||
-- on successful vserver join add the new VS to node and recurse
|
-- on successful vserver join add the new VS to node and recurse
|
||||||
vsJoins queryVsSTM capacity (HMap.insert newNid newVs vsmap) (remainingTargetLoad - acquiredLoad) (pred remainingJoins) nodeSTM'
|
vsJoins queryVsSTM capacity (addRMapEntry newNid newVs vsmap) (remainingTargetLoad - acquiredLoad) (pred remainingJoins) nodeSTM'
|
||||||
-- on error, just reduce the amount of tries and retry
|
-- on error, just reduce the amount of tries and retry
|
||||||
`catchError` (\e -> liftIO (putStrLn e) >> vsJoins queryVsSTM capacity vsmap remainingTargetLoad (pred remainingJoins) nodeSTM')
|
`catchError` (\e -> liftIO (putStrLn e) >> vsJoins queryVsSTM capacity vsmap remainingTargetLoad (pred remainingJoins) nodeSTM')
|
||||||
|
|
||||||
|
@ -254,10 +255,9 @@ kChoicesVsJoin queryVsSTM bootstrapNode capacity activeVss nodeSTM remainingTarg
|
||||||
conf <- nodeConfig <$> liftIO (readTVarIO nodeSTM)
|
conf <- nodeConfig <$> liftIO (readTVarIO nodeSTM)
|
||||||
-- generate all possible vs IDs
|
-- generate all possible vs IDs
|
||||||
let
|
let
|
||||||
activeVsSet = HMap.keysSet activeVss
|
|
||||||
-- tuples of node IDs and vserver IDs, because vserver IDs are needed for
|
-- tuples of node IDs and vserver IDs, because vserver IDs are needed for
|
||||||
-- LocalNodeState creation
|
-- LocalNodeState creation
|
||||||
nonJoinedIDs = filter (not . flip HSet.member activeVsSet . fst) [ (genNodeID (confIP conf) (confDomain conf) v, v) | v <- [0..pred (confKChoicesMaxVS conf)]]
|
nonJoinedIDs = filter (not . flip memberRMap activeVss . fst) [ (genNodeID (confIP conf) (confDomain conf) v, v) | v <- [0..pred (confKChoicesMaxVS conf)]]
|
||||||
queryVs <- liftIO $ readTVarIO queryVsSTM
|
queryVs <- liftIO $ readTVarIO queryVsSTM
|
||||||
|
|
||||||
-- query load of all possible segments
|
-- query load of all possible segments
|
||||||
|
@ -526,7 +526,7 @@ joinOnNewEntriesThread nodeSTM = loop
|
||||||
(lookupResult, conf, firstVSSTM) <- atomically $ do
|
(lookupResult, conf, firstVSSTM) <- atomically $ do
|
||||||
nodeSnap <- readTVar nodeSTM
|
nodeSnap <- readTVar nodeSTM
|
||||||
let conf = nodeConfig nodeSnap
|
let conf = nodeConfig nodeSnap
|
||||||
case headMay (HMap.elems $ vservers nodeSnap) of
|
case headMay (rMapToList $ vservers nodeSnap) of
|
||||||
Nothing -> retry
|
Nothing -> retry
|
||||||
Just vsSTM -> do
|
Just vsSTM -> do
|
||||||
-- take any active vserver as heuristic for whether this node has
|
-- take any active vserver as heuristic for whether this node has
|
||||||
|
@ -573,7 +573,7 @@ nodeCacheVerifyThread :: RealNodeSTM s -> IO ()
|
||||||
nodeCacheVerifyThread nodeSTM = forever $ do
|
nodeCacheVerifyThread nodeSTM = forever $ do
|
||||||
(node, firstVSSTM) <- atomically $ do
|
(node, firstVSSTM) <- atomically $ do
|
||||||
node <- readTVar nodeSTM
|
node <- readTVar nodeSTM
|
||||||
case headMay (HMap.elems $ vservers node) of
|
case headMay (rMapToList $ vservers node) of
|
||||||
-- wait until first VS is joined
|
-- wait until first VS is joined
|
||||||
Nothing -> retry
|
Nothing -> retry
|
||||||
Just vs' -> pure (node, vs')
|
Just vs' -> pure (node, vs')
|
||||||
|
@ -958,7 +958,7 @@ fediMessageHandler sendQ recvQ nodeSTM = do
|
||||||
|
|
||||||
pure ()
|
pure ()
|
||||||
where
|
where
|
||||||
dispatchVS node req = HMap.lookup (receiverID req) (vservers node)
|
dispatchVS node req = rMapLookup (receiverID req) (vservers node)
|
||||||
|
|
||||||
|
|
||||||
-- ==== interface to service layer ====
|
-- ==== interface to service layer ====
|
||||||
|
@ -1009,7 +1009,7 @@ updateLookupCache :: RealNodeSTM s -> NodeID -> IO (Maybe (String, PortNumber))
|
||||||
updateLookupCache nodeSTM keyToLookup = do
|
updateLookupCache nodeSTM keyToLookup = do
|
||||||
(node, lookupSource) <- atomically $ do
|
(node, lookupSource) <- atomically $ do
|
||||||
node <- readTVar nodeSTM
|
node <- readTVar nodeSTM
|
||||||
let firstVs = headMay (HMap.elems $ vservers node)
|
let firstVs = headMay (rMapToList $ vservers node)
|
||||||
lookupSource <- case firstVs of
|
lookupSource <- case firstVs of
|
||||||
Nothing -> pure Nothing
|
Nothing -> pure Nothing
|
||||||
Just vs -> Just <$> readTVar vs
|
Just vs -> Just <$> readTVar vs
|
||||||
|
|
|
@ -175,9 +175,9 @@ data RealNode s = RealNode
|
||||||
-- | insert a new vserver mapping into a node
|
-- | insert a new vserver mapping into a node
|
||||||
addVserver :: (NodeID, LocalNodeStateSTM s) -> RealNode s -> RealNode s
|
addVserver :: (NodeID, LocalNodeStateSTM s) -> RealNode s -> RealNode s
|
||||||
addVserver (key, nstate) node = node
|
addVserver (key, nstate) node = node
|
||||||
{ vservers = HMap.insert key nstate (vservers node) }
|
{ vservers = addRMapEntry key nstate (vservers node) }
|
||||||
|
|
||||||
type VSMap s = HashMap NodeID (LocalNodeStateSTM s)
|
type VSMap s = RingMap NodeID (LocalNodeStateSTM s)
|
||||||
type RealNodeSTM s = TVar (RealNode s)
|
type RealNodeSTM s = TVar (RealNode s)
|
||||||
|
|
||||||
-- | represents a node and all its important state
|
-- | represents a node and all its important state
|
||||||
|
|
|
@ -106,6 +106,23 @@ rMapSize rmap = fromIntegral $ Map.size innerMap - oneIfEntry rmap minBound - on
|
||||||
| isNothing (rMapLookup nid rmap') = 1
|
| isNothing (rMapLookup nid rmap') = 1
|
||||||
| otherwise = 0
|
| otherwise = 0
|
||||||
|
|
||||||
|
|
||||||
|
-- | whether the RingMap is empty (except for empty proxy entries)
|
||||||
|
nullRMap :: (Num k, Bounded k, Ord k)
|
||||||
|
=> RingMap k a
|
||||||
|
-> Bool
|
||||||
|
-- basic idea: look for a predecessor starting from the upper Map bound,
|
||||||
|
-- Nothing indicates no entry being found
|
||||||
|
nullRMap = isNothing . rMapLookupPred maxBound
|
||||||
|
|
||||||
|
|
||||||
|
-- | O(logn( Is the key a member of the RingMap?
|
||||||
|
memberRMap :: (Bounded k, Ord k)
|
||||||
|
=> k
|
||||||
|
-> RingMap k a
|
||||||
|
-> Bool
|
||||||
|
memberRMap key = isJust . rMapLookup key
|
||||||
|
|
||||||
-- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@
|
-- | a wrapper around lookup functions, making the lookup redirectable by a @ProxyEntry@
|
||||||
-- to simulate a modular ring
|
-- to simulate a modular ring
|
||||||
lookupWrapper :: (Bounded k, Ord k, Num k)
|
lookupWrapper :: (Bounded k, Ord k, Num k)
|
||||||
|
@ -198,9 +215,11 @@ deleteRMapEntry nid = RingMap . Map.update modifier nid . getRingMap
|
||||||
modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing)
|
modifier (ProxyEntry idPointer _) = Just (ProxyEntry idPointer Nothing)
|
||||||
modifier KeyEntry {} = Nothing
|
modifier KeyEntry {} = Nothing
|
||||||
|
|
||||||
|
-- TODO: rename this to elems
|
||||||
rMapToList :: (Bounded k, Ord k) => RingMap k a -> [a]
|
rMapToList :: (Bounded k, Ord k) => RingMap k a -> [a]
|
||||||
rMapToList = mapMaybe extractRingEntry . Map.elems . getRingMap
|
rMapToList = mapMaybe extractRingEntry . Map.elems . getRingMap
|
||||||
|
|
||||||
|
-- TODO: rename this to toList
|
||||||
rMapToListWithKeys :: (Bounded k, Ord k) => RingMap k a -> [(k, a)]
|
rMapToListWithKeys :: (Bounded k, Ord k) => RingMap k a -> [(k, a)]
|
||||||
rMapToListWithKeys = Map.foldrWithKey (\k v acc ->
|
rMapToListWithKeys = Map.foldrWithKey (\k v acc ->
|
||||||
maybe acc (\val -> (k, val):acc) $ extractRingEntry v
|
maybe acc (\val -> (k, val):acc) $ extractRingEntry v
|
||||||
|
@ -211,6 +230,13 @@ rMapToListWithKeys = Map.foldrWithKey (\k v acc ->
|
||||||
rMapFromList :: (Bounded k, Ord k) => [(k, a)] -> RingMap k a
|
rMapFromList :: (Bounded k, Ord k) => [(k, a)] -> RingMap k a
|
||||||
rMapFromList = setRMapEntries
|
rMapFromList = setRMapEntries
|
||||||
|
|
||||||
|
|
||||||
|
-- | this just merges the underlying 'Map.Map' s and does not check whether the
|
||||||
|
-- ProxyEntry pointers are consistent, so better only create unions of
|
||||||
|
-- equal-pointered RingMaps
|
||||||
|
unionRMap :: (Bounded k, Ord k) => RingMap k a -> RingMap k a -> RingMap k a
|
||||||
|
unionRMap a b = RingMap $ Map.union (getRingMap a) (getRingMap b)
|
||||||
|
|
||||||
-- | takes up to i entries from a 'RingMap' by calling a getter function on a
|
-- | takes up to i entries from a 'RingMap' by calling a getter function on a
|
||||||
-- *startAt* value and after that on the previously returned value.
|
-- *startAt* value and after that on the previously returned value.
|
||||||
-- Stops once i entries have been taken or an entry has been encountered twice
|
-- Stops once i entries have been taken or an entry has been encountered twice
|
||||||
|
|
|
@ -19,6 +19,7 @@ import Hash2Pub.ASN1Coding
|
||||||
import Hash2Pub.DHTProtocol
|
import Hash2Pub.DHTProtocol
|
||||||
import Hash2Pub.FediChord
|
import Hash2Pub.FediChord
|
||||||
import Hash2Pub.FediChordTypes
|
import Hash2Pub.FediChordTypes
|
||||||
|
import Hash2Pub.RingMap
|
||||||
|
|
||||||
spec :: Spec
|
spec :: Spec
|
||||||
spec = do
|
spec = do
|
||||||
|
@ -304,7 +305,7 @@ exampleNodeState = RemoteNodeState {
|
||||||
exampleLocalNode :: IO (LocalNodeState MockService)
|
exampleLocalNode :: IO (LocalNodeState MockService)
|
||||||
exampleLocalNode = do
|
exampleLocalNode = do
|
||||||
realNodeSTM <- newTVarIO $ RealNode {
|
realNodeSTM <- newTVarIO $ RealNode {
|
||||||
vservers = HMap.empty
|
vservers = emptyRMap
|
||||||
, nodeConfig = exampleFediConf
|
, nodeConfig = exampleFediConf
|
||||||
, bootstrapNodes = confBootstrapNodes exampleFediConf
|
, bootstrapNodes = confBootstrapNodes exampleFediConf
|
||||||
, nodeService = MockService
|
, nodeService = MockService
|
||||||
|
|
Loading…
Reference in a new issue