parent
0494ddd696
commit
d5f502c05c
|
@ -61,12 +61,13 @@ import Crypto.Hash
|
||||||
import qualified Data.ByteArray as BA
|
import qualified Data.ByteArray as BA
|
||||||
import qualified Data.ByteString as BS
|
import qualified Data.ByteString as BS
|
||||||
import qualified Data.ByteString.UTF8 as BSU
|
import qualified Data.ByteString.UTF8 as BSU
|
||||||
|
import Data.Either (rights)
|
||||||
import Data.Foldable (foldr')
|
import Data.Foldable (foldr')
|
||||||
import Data.IP (IPv6, fromHostAddress6,
|
import Data.IP (IPv6, fromHostAddress6,
|
||||||
toHostAddress6)
|
toHostAddress6)
|
||||||
import qualified Data.Map.Strict as Map
|
import qualified Data.Map.Strict as Map
|
||||||
import Data.Maybe (fromJust, fromMaybe, isJust,
|
import Data.Maybe (catMaybes, fromJust, fromMaybe,
|
||||||
mapMaybe)
|
isJust, mapMaybe)
|
||||||
import qualified Data.Set as Set
|
import qualified Data.Set as Set
|
||||||
import Data.Time.Clock.POSIX
|
import Data.Time.Clock.POSIX
|
||||||
import Data.Typeable (Typeable (..), typeOf)
|
import Data.Typeable (Typeable (..), typeOf)
|
||||||
|
@ -212,10 +213,25 @@ stabiliseThread nsSTM = forever $ do
|
||||||
-> m ()
|
-> m ()
|
||||||
parseNeighbourResponse (Left err) = throwError err
|
parseNeighbourResponse (Left err) = throwError err
|
||||||
parseNeighbourResponse (Right (succs, preds)) = liftIO $ do
|
parseNeighbourResponse (Right (succs, preds)) = liftIO $ do
|
||||||
|
-- ping each returned node before actually inserting them
|
||||||
|
-- send pings in parallel, check whether ID is part of the returned IDs
|
||||||
|
nsSnap <- readTVarIO nsSTM
|
||||||
|
pingThreads <- mapM (async . checkReachability nsSnap) $ preds <> succs
|
||||||
|
-- ToDo: exception handling, maybe log them
|
||||||
|
-- filter out own node
|
||||||
|
checkedNeighbours <- filter (/= toRemoteNodeState nsSnap) . catMaybes . rights <$> mapM waitCatch pingThreads
|
||||||
|
|
||||||
atomically $ do
|
atomically $ do
|
||||||
newerNsSnap <- readTVar nsSTM
|
newerNsSnap <- readTVar nsSTM
|
||||||
writeTVar nsSTM $ addPredecessors preds . addSuccessors succs $ newerNsSnap
|
writeTVar nsSTM $ addPredecessors checkedNeighbours . addSuccessors checkedNeighbours $ newerNsSnap
|
||||||
pure ()
|
pure ()
|
||||||
|
checkReachability :: LocalNodeState -> RemoteNodeState -> IO (Maybe RemoteNodeState)
|
||||||
|
checkReachability ns toCheck = do
|
||||||
|
resp <- requestPing ns toCheck
|
||||||
|
pure $ either (const Nothing) (\vss ->
|
||||||
|
if toCheck `elem` vss then Just toCheck else Nothing
|
||||||
|
) resp
|
||||||
|
|
||||||
|
|
||||||
-- periodically contact immediate successor and predecessor
|
-- periodically contact immediate successor and predecessor
|
||||||
-- If they respond, see whether there is a closer neighbour in between
|
-- If they respond, see whether there is a closer neighbour in between
|
||||||
|
|
Loading…
Reference in a new issue