From 111c1a299d646991a49320315442f353464cb856 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Tue, 23 Jun 2020 19:33:54 +0200 Subject: [PATCH] refactored stabilise: use first responding neighbour contributes to #44 --- src/Hash2Pub/FediChord.hs | 91 ++++++++++++++++++++++++--------------- 1 file changed, 56 insertions(+), 35 deletions(-) diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index e1ec96b..a5967ad 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -65,9 +65,10 @@ import Data.Either (rights) import Data.Foldable (foldr') import Data.IP (IPv6, fromHostAddress6, toHostAddress6) +import Data.List ((\\)) import qualified Data.Map.Strict as Map import Data.Maybe (catMaybes, fromJust, fromMaybe, - isJust, mapMaybe) + isJust, isNothing, mapMaybe) import qualified Data.Set as Set import Data.Time.Clock.POSIX import Data.Typeable (Typeable (..), typeOf) @@ -184,47 +185,67 @@ cacheWriter nsSTM = stabiliseThread :: LocalNodeStateSTM -> IO () stabiliseThread nsSTM = forever $ do ns <- readTVarIO nsSTM + + -- iterate through the same snapshot, collect potential new neighbours + -- and nodes to be deleted, and modify these changes only at the end of + -- each stabilise run. + -- This decision makes iterating through a potentially changing list easier. + + -- don't contact all neighbours unless the previous one failed/ Left ed + + predStabilise <- stabiliseClosestResponder ns predecessors 1 [] + succStabilise <- stabiliseClosestResponder ns predecessors 1 [] + + let + (predDeletes, predNeighbours) = either (const ([], [])) id predStabilise + (succDeletes, succNeighbours) = either (const ([], [])) id succStabilise + allDeletes = predDeletes <> succDeletes + allNeighbours = predNeighbours <> succNeighbours + + -- now actually modify the node state's neighbours + updatedNs <- atomically $ do + newerNsSnap <- readTVar nsSTM + let + -- sorting and taking only k neighbours is taken care of by the + -- setSuccessors/ setPredecessors functions + newPreds = (predecessors newerNsSnap \\ allDeletes) <> allNeighbours + newSuccs = (successors newerNsSnap \\ allDeletes) <> allNeighbours + newNs = setPredecessors newPreds . setSuccessors newSuccs $ newerNsSnap + writeTVar nsSTM newNs + pure newNs + -- TODO: update successfully stabilised nodes in cache - -- first stabilise immediate neihbours, then the next ones - forM_ [1..(kNeighbours ns)] (\num -> do - stabiliseNeighbour nsSTM predecessors num - stabiliseNeighbour nsSTM successors num - ) + + -- try looking up additional neighbours if list too short + -- TODO: make delay configurable threadDelay (60 * 1000) where - stabiliseNeighbour :: LocalNodeStateSTM + stabiliseClosestResponder :: LocalNodeState -> (LocalNodeState -> [RemoteNodeState]) -> Int - -> IO (Either String ()) - stabiliseNeighbour nsSTM neighbourGetter neighbourNum = do - nsSnap <- readTVarIO nsSTM - let chosenNode = maybe (Left "no such neighbour entry") Right $ atMay (neighbourGetter nsSnap) neighbourNum - -- returning @Left@ signifies the need to try again with next from list - runExceptT $ requestToNeighbour nsSnap chosenNode >>= parseNeighbourResponse - requestToNeighbour :: (MonadError String m, MonadIO m) - => LocalNodeState - -> Either String RemoteNodeState - -> m (Either String ([RemoteNodeState],[RemoteNodeState])) - requestToNeighbour _ (Left err) = throwError err - requestToNeighbour ns (Right n) = liftIO $ requestStabilise ns n - parseNeighbourResponse :: (MonadError String m, MonadIO m) - => Either String ([RemoteNodeState], [RemoteNodeState]) - -> m () - parseNeighbourResponse (Left err) = throwError err - parseNeighbourResponse (Right (succs, preds)) = liftIO $ do - -- ping each returned node before actually inserting them - -- send pings in parallel, check whether ID is part of the returned IDs - nsSnap <- readTVarIO nsSTM - pingThreads <- mapM (async . checkReachability nsSnap) $ preds <> succs - -- ToDo: exception handling, maybe log them - -- filter out own node - checkedNeighbours <- filter (/= toRemoteNodeState nsSnap) . catMaybes . rights <$> mapM waitCatch pingThreads + -> [RemoteNodeState] -- ^ delete accumulator + -> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (nodes to be deleted, successfully pinged potential neighbours) + stabiliseClosestResponder ns neighbourGetter neighbourNum deleteAcc + | isNothing (currentNeighbour ns neighbourGetter neighbourNum) = pure $ Left "exhausted all neigbours" + | otherwise = do + let node = fromJust $ currentNeighbour ns neighbourGetter neighbourNum + stabResponse <- requestStabilise ns node + case stabResponse of + -- returning @Left@ signifies the need to try again with next from list + Left err -> stabiliseClosestResponder ns neighbourGetter (neighbourNum+1) (node:deleteAcc) + Right (succs, preds) -> do + -- ping each returned node before actually inserting them + -- send pings in parallel, check whether ID is part of the returned IDs + pingThreads <- mapM (async . checkReachability ns) $ preds <> succs + -- ToDo: exception handling, maybe log them + -- filter out own node + checkedNeighbours <- filter (/= toRemoteNodeState ns) . catMaybes . rights <$> mapM waitCatch pingThreads + pure $ Right (deleteAcc, checkedNeighbours) + + + currentNeighbour ns neighbourGetter = atMay $ neighbourGetter ns - atomically $ do - newerNsSnap <- readTVar nsSTM - writeTVar nsSTM $ addPredecessors checkedNeighbours . addSuccessors checkedNeighbours $ newerNsSnap - pure () checkReachability :: LocalNodeState -> RemoteNodeState -> IO (Maybe RemoteNodeState) checkReachability ns toCheck = do resp <- requestPing ns toCheck