forked from schmittlauch/Hash2Pub
refactored stabilise: use first responding neighbour
contributes to #44
This commit is contained in:
parent
d5f502c05c
commit
111c1a299d
|
@ -65,9 +65,10 @@ import Data.Either (rights)
|
|||
import Data.Foldable (foldr')
|
||||
import Data.IP (IPv6, fromHostAddress6,
|
||||
toHostAddress6)
|
||||
import Data.List ((\\))
|
||||
import qualified Data.Map.Strict as Map
|
||||
import Data.Maybe (catMaybes, fromJust, fromMaybe,
|
||||
isJust, mapMaybe)
|
||||
isJust, isNothing, mapMaybe)
|
||||
import qualified Data.Set as Set
|
||||
import Data.Time.Clock.POSIX
|
||||
import Data.Typeable (Typeable (..), typeOf)
|
||||
|
@ -184,47 +185,67 @@ cacheWriter nsSTM =
|
|||
stabiliseThread :: LocalNodeStateSTM -> IO ()
|
||||
stabiliseThread nsSTM = forever $ do
|
||||
ns <- readTVarIO nsSTM
|
||||
|
||||
-- iterate through the same snapshot, collect potential new neighbours
|
||||
-- and nodes to be deleted, and modify these changes only at the end of
|
||||
-- each stabilise run.
|
||||
-- This decision makes iterating through a potentially changing list easier.
|
||||
|
||||
-- don't contact all neighbours unless the previous one failed/ Left ed
|
||||
|
||||
predStabilise <- stabiliseClosestResponder ns predecessors 1 []
|
||||
succStabilise <- stabiliseClosestResponder ns predecessors 1 []
|
||||
|
||||
let
|
||||
(predDeletes, predNeighbours) = either (const ([], [])) id predStabilise
|
||||
(succDeletes, succNeighbours) = either (const ([], [])) id succStabilise
|
||||
allDeletes = predDeletes <> succDeletes
|
||||
allNeighbours = predNeighbours <> succNeighbours
|
||||
|
||||
-- now actually modify the node state's neighbours
|
||||
updatedNs <- atomically $ do
|
||||
newerNsSnap <- readTVar nsSTM
|
||||
let
|
||||
-- sorting and taking only k neighbours is taken care of by the
|
||||
-- setSuccessors/ setPredecessors functions
|
||||
newPreds = (predecessors newerNsSnap \\ allDeletes) <> allNeighbours
|
||||
newSuccs = (successors newerNsSnap \\ allDeletes) <> allNeighbours
|
||||
newNs = setPredecessors newPreds . setSuccessors newSuccs $ newerNsSnap
|
||||
writeTVar nsSTM newNs
|
||||
pure newNs
|
||||
|
||||
-- TODO: update successfully stabilised nodes in cache
|
||||
-- first stabilise immediate neihbours, then the next ones
|
||||
forM_ [1..(kNeighbours ns)] (\num -> do
|
||||
stabiliseNeighbour nsSTM predecessors num
|
||||
stabiliseNeighbour nsSTM successors num
|
||||
)
|
||||
|
||||
-- try looking up additional neighbours if list too short
|
||||
|
||||
-- TODO: make delay configurable
|
||||
threadDelay (60 * 1000)
|
||||
where
|
||||
stabiliseNeighbour :: LocalNodeStateSTM
|
||||
stabiliseClosestResponder :: LocalNodeState
|
||||
-> (LocalNodeState -> [RemoteNodeState])
|
||||
-> Int
|
||||
-> IO (Either String ())
|
||||
stabiliseNeighbour nsSTM neighbourGetter neighbourNum = do
|
||||
nsSnap <- readTVarIO nsSTM
|
||||
let chosenNode = maybe (Left "no such neighbour entry") Right $ atMay (neighbourGetter nsSnap) neighbourNum
|
||||
-> [RemoteNodeState] -- ^ delete accumulator
|
||||
-> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (nodes to be deleted, successfully pinged potential neighbours)
|
||||
stabiliseClosestResponder ns neighbourGetter neighbourNum deleteAcc
|
||||
| isNothing (currentNeighbour ns neighbourGetter neighbourNum) = pure $ Left "exhausted all neigbours"
|
||||
| otherwise = do
|
||||
let node = fromJust $ currentNeighbour ns neighbourGetter neighbourNum
|
||||
stabResponse <- requestStabilise ns node
|
||||
case stabResponse of
|
||||
-- returning @Left@ signifies the need to try again with next from list
|
||||
runExceptT $ requestToNeighbour nsSnap chosenNode >>= parseNeighbourResponse
|
||||
requestToNeighbour :: (MonadError String m, MonadIO m)
|
||||
=> LocalNodeState
|
||||
-> Either String RemoteNodeState
|
||||
-> m (Either String ([RemoteNodeState],[RemoteNodeState]))
|
||||
requestToNeighbour _ (Left err) = throwError err
|
||||
requestToNeighbour ns (Right n) = liftIO $ requestStabilise ns n
|
||||
parseNeighbourResponse :: (MonadError String m, MonadIO m)
|
||||
=> Either String ([RemoteNodeState], [RemoteNodeState])
|
||||
-> m ()
|
||||
parseNeighbourResponse (Left err) = throwError err
|
||||
parseNeighbourResponse (Right (succs, preds)) = liftIO $ do
|
||||
Left err -> stabiliseClosestResponder ns neighbourGetter (neighbourNum+1) (node:deleteAcc)
|
||||
Right (succs, preds) -> do
|
||||
-- ping each returned node before actually inserting them
|
||||
-- send pings in parallel, check whether ID is part of the returned IDs
|
||||
nsSnap <- readTVarIO nsSTM
|
||||
pingThreads <- mapM (async . checkReachability nsSnap) $ preds <> succs
|
||||
pingThreads <- mapM (async . checkReachability ns) $ preds <> succs
|
||||
-- ToDo: exception handling, maybe log them
|
||||
-- filter out own node
|
||||
checkedNeighbours <- filter (/= toRemoteNodeState nsSnap) . catMaybes . rights <$> mapM waitCatch pingThreads
|
||||
checkedNeighbours <- filter (/= toRemoteNodeState ns) . catMaybes . rights <$> mapM waitCatch pingThreads
|
||||
pure $ Right (deleteAcc, checkedNeighbours)
|
||||
|
||||
|
||||
currentNeighbour ns neighbourGetter = atMay $ neighbourGetter ns
|
||||
|
||||
atomically $ do
|
||||
newerNsSnap <- readTVar nsSTM
|
||||
writeTVar nsSTM $ addPredecessors checkedNeighbours . addSuccessors checkedNeighbours $ newerNsSnap
|
||||
pure ()
|
||||
checkReachability :: LocalNodeState -> RemoteNodeState -> IO (Maybe RemoteNodeState)
|
||||
checkReachability ns toCheck = do
|
||||
resp <- requestPing ns toCheck
|
||||
|
|
Loading…
Reference in a new issue