diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 4936e7c..51c23c5 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -218,7 +218,7 @@ kChoicesNodeJoin nodeSTM bootstrapNode = do -- kCoicesVsJoin until target is met – unless there's already an active & joined VS causing enough load alreadyJoinedVss <- liftIO $ foldM (\sumAcc vsSTM -> readTVarIO vsSTM >>= (\vs -> pure . (+) sumAcc $ if vsIsJoined vs then 1 else 0)) 0 $ vservers node unless (alreadyJoinedVss > 0 && compensatedLoadSum ownLoadStats >= joinLoadTarget) $ do - joinedVss <- vsJoins vs0STM (totalCapacity ownLoadStats) (vservers node) joinLoadTarget (fromIntegral initialJoins - alreadyJoinedVss) nodeSTM + joinedVss <- vsJoins vs0 (totalCapacity ownLoadStats) (vservers node) joinLoadTarget (fromIntegral initialJoins - alreadyJoinedVss) nodeSTM if nullRMap joinedVss then throwError "k-choices join unsuccessful, no vserver joined" else liftIO . atomically . modifyTVar' nodeSTM $ \node' -> node' @@ -226,17 +226,17 @@ kChoicesNodeJoin nodeSTM bootstrapNode = do where vsJoins :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) - => LocalNodeStateSTM s -> Double -> VSMap s -> Double -> Int -> RealNodeSTM s -> m (VSMap s) + => LocalNodeState s -> Double -> VSMap s -> Double -> Int -> RealNodeSTM s -> m (VSMap s) vsJoins _ _ vsmap _ 0 _ = pure vsmap - vsJoins queryVsSTM capacity vsmap remainingTargetLoad remainingJoins nodeSTM' + vsJoins queryVs capacity vsmap remainingTargetLoad remainingJoins nodeSTM' | remainingTargetLoad <= 0 = pure vsmap | otherwise = do - (acquiredLoad, (newNid, newVs)) <- kChoicesVsJoin queryVsSTM bootstrapNode capacity vsmap nodeSTM' remainingTargetLoad + (acquiredLoad, (newNid, newVs)) <- kChoicesVsJoin queryVs bootstrapNode capacity vsmap nodeSTM' remainingTargetLoad -- on successful vserver join add the new VS to node and recurse - vsJoins queryVsSTM capacity (addRMapEntry newNid newVs vsmap) (remainingTargetLoad - acquiredLoad) (pred remainingJoins) nodeSTM' + vsJoins queryVs capacity (addRMapEntry newNid newVs vsmap) (remainingTargetLoad - acquiredLoad) (pred remainingJoins) nodeSTM' -- on error, just reduce the amount of tries and retry - `catchError` (\e -> liftIO (putStrLn e) >> vsJoins queryVsSTM capacity vsmap remainingTargetLoad (pred remainingJoins) nodeSTM') + `catchError` (\e -> liftIO (putStrLn e) >> vsJoins queryVs capacity vsmap remainingTargetLoad (pred remainingJoins) nodeSTM') -- error cause 1: not a single queried node has responded -> indicates permanent failure -- error cause 2: only a certain join failed, just ignore that join target for now, but problem: it will be the chosen @@ -244,39 +244,17 @@ kChoicesNodeJoin nodeSTM bootstrapNode = do -- `catchError` (const . kChoicesVsJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) - => LocalNodeStateSTM s -- ^ vserver to be used for querying - -> Maybe (String, PortNumber) -- ^ domain and port of a bootstrapping node, if bootstrappinG + => LocalNodeState s -- ^ vserver to be used for querying + -> Maybe (String, PortNumber) -- ^ domain and port of a bootstrapping node, if bootstrapping -> Double -- ^ own capacity -> VSMap s -- ^ currently active VServers -> RealNodeSTM s -- ^ parent node is needed for initialising a new vserver -> Double -- ^ remaining load target -> m (Double, (NodeID, LocalNodeStateSTM s)) -- ^ on success return tuple of acquired load and newly acquired vserver -kChoicesVsJoin queryVsSTM bootstrapNode capacity activeVss nodeSTM remainingTarget = do +kChoicesVsJoin queryVs bootstrapNode capacity activeVss nodeSTM remainingTarget = do conf <- nodeConfig <$> liftIO (readTVarIO nodeSTM) -- generate all possible vs IDs - let - -- tuples of node IDs and vserver IDs, because vserver IDs are needed for - -- LocalNodeState creation - nonJoinedIDs = filter (not . flip memberRMap activeVss . fst) [ (genNodeID (confIP conf) (confDomain conf) v, v) | v <- [0..pred (confKChoicesMaxVS conf)]] - queryVs <- liftIO $ readTVarIO queryVsSTM - - -- query load of all possible segments - -- simplification: treat each load lookup failure as a general unavailability of that segment - -- TODO: retries for transient failures - segmentLoads <- fmap catMaybes . forM nonJoinedIDs $ (\(vsNid, vsId) -> (do - -- if bootstrap node is provided, do initial lookup via that - currentlyResponsible <- maybe - (requestQueryID queryVs vsNid) - (\bs -> bootstrapQueryId queryVsSTM bs vsNid) - bootstrapNode - segment <- requestQueryLoad queryVs vsNid currentlyResponsible - pure $ Just (segment, vsId, currentlyResponsible) - -- store segment stats and vserver ID together, so it's clear - -- which vs needs to be joined to acquire this segment - ) `catchError` const (pure Nothing) - ) - - + segmentLoads <- kChoicesSegmentLoads conf queryVs bootstrapNode activeVss -- cost calculation and sort by cost -- edge case: no possible ID has responded (mincost, vsId, toJoinOn) <- maybe (throwError "received no load information") pure @@ -303,6 +281,39 @@ kChoicesVsJoin queryVsSTM bootstrapNode capacity activeVss nodeSTM remainingTarg -- changes the remainingLoadTarget at each vsJoin. This target change -- needs to be accounted for starting from the 2nd vsJoin. + +-- | query the load of all still unjoined VS positions +kChoicesSegmentLoads :: (Service s (RealNodeSTM s), MonadError String m, MonadIO m) + => FediChordConf -- ^ config params needed for generating all possible VSs + -> LocalNodeState s -- ^ vserver to be used for querying + -> Maybe (String, PortNumber) -- ^ domain and port of a bootstrapping node, if bootstrapping + -> VSMap s -- ^ currently active VServers + -> m [(SegmentLoadStats, Word8, RemoteNodeState)] +kChoicesSegmentLoads conf queryVs bootstrapNode activeVss = do + let + -- tuples of node IDs and vserver IDs, because vserver IDs are needed for + -- LocalNodeState creation + nonJoinedIDs = filter (not . flip memberRMap activeVss . fst) [ (genNodeID (confIP conf) (confDomain conf) v, v) | v <- [0..pred (confKChoicesMaxVS conf)]] + + -- query load of all possible segments + -- simplification: treat each load lookup failure as a general unavailability of that segment + -- TODO: retries for transient failures + fmap catMaybes . forM nonJoinedIDs $ (\(vsNid, vsId) -> (do + -- if bootstrap node is provided, do initial lookup via that + currentlyResponsible <- maybe + (requestQueryID queryVs vsNid) + (\bs -> bootstrapQueryId queryVs bs vsNid) + bootstrapNode + segment <- requestQueryLoad queryVs vsNid currentlyResponsible + pure $ Just (segment, vsId, currentlyResponsible) + -- store segment stats and vserver ID together, so it's clear + -- which vs needs to be joined to acquire this segment + ) `catchError` const (pure Nothing) + ) + + + + kChoicesJoinCost :: Double -- ^ own remaining load target -> Double -- ^ own capacity -> SegmentLoadStats -- ^ load stats of neighbour vs @@ -324,27 +335,19 @@ kChoicesDepartureCost remainingOwnLoad ownCap thisSegmentLoad segment = kChoicesRebalanceThread :: (Service s (RealNodeSTM s)) => RealNodeSTM s -> IO () -kChoicesRebalanceThread nodeSTM = (confKChoicesRebalanceInterval . nodeConfig <$> readTVarIO nodeSTM) >>= rebalanceVS 0 +kChoicesRebalanceThread nodeSTM = (confKChoicesRebalanceInterval . nodeConfig <$> readTVarIO nodeSTM) >>= rebalanceVS where - rebalanceVS :: NodeID -> Int -> IO () - rebalanceVS startAt interval = do + rebalanceVS :: Int -> IO () + rebalanceVS interval = do threadDelay interval - (vsToRebalance', serv) <- atomically $ do - node <- readTVar nodeSTM - pure (rMapLookupPred 0 (vservers node), nodeService node) - maybe - -- wait and retry if no active VS available - (rebalanceVS startAt interval) - (\(vsNid, vsSTM) -> do - vs <- readTVarIO vsSTM - -- query neighbour node(s) load - -- query own load - -- calculate departure cost - -- if deciding to re-balance, first leave and then join - -- loop - rebalanceVS vsNid interval - ) - vsToRebalance' + -- query load of all possible available VS IDs + -- query load of all existing VSes neighbours + -- calculate all departure costs + -- select VS with lowest departure cost + -- calculate all relocation costs of that VS + -- if deciding to re-balance, first leave and then join + -- loop + rebalanceVS interval -- placeholder @@ -362,7 +365,7 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do ns <- readTVarIO nsSTM runExceptT $ do -- 1. get routed to the currently responsible node - currentlyResponsible <- bootstrapQueryId nsSTM bootstrapNode $ getNid ns + currentlyResponsible <- bootstrapQueryId ns bootstrapNode $ getNid ns liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) -- 2. then send a join to the currently responsible node liftIO $ putStrLn "send a bootstrap Join" @@ -384,7 +387,7 @@ convergenceSampleThread nodeSTM = forever $ do let bss = bootstrapNodes parentNode randIndex <- liftIO $ randomRIO (0, length bss - 1) chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex - currentlyResponsible <- bootstrapQueryId nsSTM chosenNode (getNid nsSnap) + currentlyResponsible <- bootstrapQueryId nsSnap chosenNode (getNid nsSnap) if getNid currentlyResponsible /= getNid nsSnap -- if mismatch, stabilise on the result, else do nothing then do @@ -435,12 +438,11 @@ tryBootstrapJoining nodeSTM = do -- | Look up a key just based on the responses of a single bootstrapping node. bootstrapQueryId :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) - => LocalNodeStateSTM s + => LocalNodeState s -> (String, PortNumber) -> NodeID -> m RemoteNodeState -bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do - ns <- liftIO $ readTVarIO nsSTM +bootstrapQueryId ns (bootstrapHost, bootstrapPort) targetID = do nodeConf <- nodeConfig <$> liftIO (readTVarIO $ parentRealNode ns) let srcAddr = confIP nodeConf -- IP address needed for ID generation, so look it up