diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 51c23c5..4936e7c 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -218,7 +218,7 @@ kChoicesNodeJoin nodeSTM bootstrapNode = do -- kCoicesVsJoin until target is met – unless there's already an active & joined VS causing enough load alreadyJoinedVss <- liftIO $ foldM (\sumAcc vsSTM -> readTVarIO vsSTM >>= (\vs -> pure . (+) sumAcc $ if vsIsJoined vs then 1 else 0)) 0 $ vservers node unless (alreadyJoinedVss > 0 && compensatedLoadSum ownLoadStats >= joinLoadTarget) $ do - joinedVss <- vsJoins vs0 (totalCapacity ownLoadStats) (vservers node) joinLoadTarget (fromIntegral initialJoins - alreadyJoinedVss) nodeSTM + joinedVss <- vsJoins vs0STM (totalCapacity ownLoadStats) (vservers node) joinLoadTarget (fromIntegral initialJoins - alreadyJoinedVss) nodeSTM if nullRMap joinedVss then throwError "k-choices join unsuccessful, no vserver joined" else liftIO . atomically . modifyTVar' nodeSTM $ \node' -> node' @@ -226,17 +226,17 @@ kChoicesNodeJoin nodeSTM bootstrapNode = do where vsJoins :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) - => LocalNodeState s -> Double -> VSMap s -> Double -> Int -> RealNodeSTM s -> m (VSMap s) + => LocalNodeStateSTM s -> Double -> VSMap s -> Double -> Int -> RealNodeSTM s -> m (VSMap s) vsJoins _ _ vsmap _ 0 _ = pure vsmap - vsJoins queryVs capacity vsmap remainingTargetLoad remainingJoins nodeSTM' + vsJoins queryVsSTM capacity vsmap remainingTargetLoad remainingJoins nodeSTM' | remainingTargetLoad <= 0 = pure vsmap | otherwise = do - (acquiredLoad, (newNid, newVs)) <- kChoicesVsJoin queryVs bootstrapNode capacity vsmap nodeSTM' remainingTargetLoad + (acquiredLoad, (newNid, newVs)) <- kChoicesVsJoin queryVsSTM bootstrapNode capacity vsmap nodeSTM' remainingTargetLoad -- on successful vserver join add the new VS to node and recurse - vsJoins queryVs capacity (addRMapEntry newNid newVs vsmap) (remainingTargetLoad - acquiredLoad) (pred remainingJoins) nodeSTM' + vsJoins queryVsSTM capacity (addRMapEntry newNid newVs vsmap) (remainingTargetLoad - acquiredLoad) (pred remainingJoins) nodeSTM' -- on error, just reduce the amount of tries and retry - `catchError` (\e -> liftIO (putStrLn e) >> vsJoins queryVs capacity vsmap remainingTargetLoad (pred remainingJoins) nodeSTM') + `catchError` (\e -> liftIO (putStrLn e) >> vsJoins queryVsSTM capacity vsmap remainingTargetLoad (pred remainingJoins) nodeSTM') -- error cause 1: not a single queried node has responded -> indicates permanent failure -- error cause 2: only a certain join failed, just ignore that join target for now, but problem: it will be the chosen @@ -244,17 +244,39 @@ kChoicesNodeJoin nodeSTM bootstrapNode = do -- `catchError` (const . kChoicesVsJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) - => LocalNodeState s -- ^ vserver to be used for querying - -> Maybe (String, PortNumber) -- ^ domain and port of a bootstrapping node, if bootstrapping + => LocalNodeStateSTM s -- ^ vserver to be used for querying + -> Maybe (String, PortNumber) -- ^ domain and port of a bootstrapping node, if bootstrappinG -> Double -- ^ own capacity -> VSMap s -- ^ currently active VServers -> RealNodeSTM s -- ^ parent node is needed for initialising a new vserver -> Double -- ^ remaining load target -> m (Double, (NodeID, LocalNodeStateSTM s)) -- ^ on success return tuple of acquired load and newly acquired vserver -kChoicesVsJoin queryVs bootstrapNode capacity activeVss nodeSTM remainingTarget = do +kChoicesVsJoin queryVsSTM bootstrapNode capacity activeVss nodeSTM remainingTarget = do conf <- nodeConfig <$> liftIO (readTVarIO nodeSTM) -- generate all possible vs IDs - segmentLoads <- kChoicesSegmentLoads conf queryVs bootstrapNode activeVss + let + -- tuples of node IDs and vserver IDs, because vserver IDs are needed for + -- LocalNodeState creation + nonJoinedIDs = filter (not . flip memberRMap activeVss . fst) [ (genNodeID (confIP conf) (confDomain conf) v, v) | v <- [0..pred (confKChoicesMaxVS conf)]] + queryVs <- liftIO $ readTVarIO queryVsSTM + + -- query load of all possible segments + -- simplification: treat each load lookup failure as a general unavailability of that segment + -- TODO: retries for transient failures + segmentLoads <- fmap catMaybes . forM nonJoinedIDs $ (\(vsNid, vsId) -> (do + -- if bootstrap node is provided, do initial lookup via that + currentlyResponsible <- maybe + (requestQueryID queryVs vsNid) + (\bs -> bootstrapQueryId queryVsSTM bs vsNid) + bootstrapNode + segment <- requestQueryLoad queryVs vsNid currentlyResponsible + pure $ Just (segment, vsId, currentlyResponsible) + -- store segment stats and vserver ID together, so it's clear + -- which vs needs to be joined to acquire this segment + ) `catchError` const (pure Nothing) + ) + + -- cost calculation and sort by cost -- edge case: no possible ID has responded (mincost, vsId, toJoinOn) <- maybe (throwError "received no load information") pure @@ -281,39 +303,6 @@ kChoicesVsJoin queryVs bootstrapNode capacity activeVss nodeSTM remainingTarget -- changes the remainingLoadTarget at each vsJoin. This target change -- needs to be accounted for starting from the 2nd vsJoin. - --- | query the load of all still unjoined VS positions -kChoicesSegmentLoads :: (Service s (RealNodeSTM s), MonadError String m, MonadIO m) - => FediChordConf -- ^ config params needed for generating all possible VSs - -> LocalNodeState s -- ^ vserver to be used for querying - -> Maybe (String, PortNumber) -- ^ domain and port of a bootstrapping node, if bootstrapping - -> VSMap s -- ^ currently active VServers - -> m [(SegmentLoadStats, Word8, RemoteNodeState)] -kChoicesSegmentLoads conf queryVs bootstrapNode activeVss = do - let - -- tuples of node IDs and vserver IDs, because vserver IDs are needed for - -- LocalNodeState creation - nonJoinedIDs = filter (not . flip memberRMap activeVss . fst) [ (genNodeID (confIP conf) (confDomain conf) v, v) | v <- [0..pred (confKChoicesMaxVS conf)]] - - -- query load of all possible segments - -- simplification: treat each load lookup failure as a general unavailability of that segment - -- TODO: retries for transient failures - fmap catMaybes . forM nonJoinedIDs $ (\(vsNid, vsId) -> (do - -- if bootstrap node is provided, do initial lookup via that - currentlyResponsible <- maybe - (requestQueryID queryVs vsNid) - (\bs -> bootstrapQueryId queryVs bs vsNid) - bootstrapNode - segment <- requestQueryLoad queryVs vsNid currentlyResponsible - pure $ Just (segment, vsId, currentlyResponsible) - -- store segment stats and vserver ID together, so it's clear - -- which vs needs to be joined to acquire this segment - ) `catchError` const (pure Nothing) - ) - - - - kChoicesJoinCost :: Double -- ^ own remaining load target -> Double -- ^ own capacity -> SegmentLoadStats -- ^ load stats of neighbour vs @@ -335,19 +324,27 @@ kChoicesDepartureCost remainingOwnLoad ownCap thisSegmentLoad segment = kChoicesRebalanceThread :: (Service s (RealNodeSTM s)) => RealNodeSTM s -> IO () -kChoicesRebalanceThread nodeSTM = (confKChoicesRebalanceInterval . nodeConfig <$> readTVarIO nodeSTM) >>= rebalanceVS +kChoicesRebalanceThread nodeSTM = (confKChoicesRebalanceInterval . nodeConfig <$> readTVarIO nodeSTM) >>= rebalanceVS 0 where - rebalanceVS :: Int -> IO () - rebalanceVS interval = do + rebalanceVS :: NodeID -> Int -> IO () + rebalanceVS startAt interval = do threadDelay interval - -- query load of all possible available VS IDs - -- query load of all existing VSes neighbours - -- calculate all departure costs - -- select VS with lowest departure cost - -- calculate all relocation costs of that VS - -- if deciding to re-balance, first leave and then join - -- loop - rebalanceVS interval + (vsToRebalance', serv) <- atomically $ do + node <- readTVar nodeSTM + pure (rMapLookupPred 0 (vservers node), nodeService node) + maybe + -- wait and retry if no active VS available + (rebalanceVS startAt interval) + (\(vsNid, vsSTM) -> do + vs <- readTVarIO vsSTM + -- query neighbour node(s) load + -- query own load + -- calculate departure cost + -- if deciding to re-balance, first leave and then join + -- loop + rebalanceVS vsNid interval + ) + vsToRebalance' -- placeholder @@ -365,7 +362,7 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do ns <- readTVarIO nsSTM runExceptT $ do -- 1. get routed to the currently responsible node - currentlyResponsible <- bootstrapQueryId ns bootstrapNode $ getNid ns + currentlyResponsible <- bootstrapQueryId nsSTM bootstrapNode $ getNid ns liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) -- 2. then send a join to the currently responsible node liftIO $ putStrLn "send a bootstrap Join" @@ -387,7 +384,7 @@ convergenceSampleThread nodeSTM = forever $ do let bss = bootstrapNodes parentNode randIndex <- liftIO $ randomRIO (0, length bss - 1) chosenNode <- maybe (throwError "invalid bootstrapping node index") pure $ atMay bss randIndex - currentlyResponsible <- bootstrapQueryId nsSnap chosenNode (getNid nsSnap) + currentlyResponsible <- bootstrapQueryId nsSTM chosenNode (getNid nsSnap) if getNid currentlyResponsible /= getNid nsSnap -- if mismatch, stabilise on the result, else do nothing then do @@ -438,11 +435,12 @@ tryBootstrapJoining nodeSTM = do -- | Look up a key just based on the responses of a single bootstrapping node. bootstrapQueryId :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) - => LocalNodeState s + => LocalNodeStateSTM s -> (String, PortNumber) -> NodeID -> m RemoteNodeState -bootstrapQueryId ns (bootstrapHost, bootstrapPort) targetID = do +bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do + ns <- liftIO $ readTVarIO nsSTM nodeConf <- nodeConfig <$> liftIO (readTVarIO $ parentRealNode ns) let srcAddr = confIP nodeConf -- IP address needed for ID generation, so look it up