Compare commits
2 commits
3482876d9b
...
d5f502c05c
Author | SHA1 | Date | |
---|---|---|---|
|
d5f502c05c | ||
|
0494ddd696 |
|
@ -55,4 +55,5 @@ readConfig = do
|
|||
, confIP = toHostAddress6 . read $ ipString
|
||||
, confDhtPort = read portString
|
||||
, confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)]
|
||||
--, confStabiliseInterval = 60
|
||||
}
|
||||
|
|
|
@ -61,12 +61,13 @@ import Crypto.Hash
|
|||
import qualified Data.ByteArray as BA
|
||||
import qualified Data.ByteString as BS
|
||||
import qualified Data.ByteString.UTF8 as BSU
|
||||
import Data.Either (rights)
|
||||
import Data.Foldable (foldr')
|
||||
import Data.IP (IPv6, fromHostAddress6,
|
||||
toHostAddress6)
|
||||
import qualified Data.Map.Strict as Map
|
||||
import Data.Maybe (fromJust, fromMaybe, isJust,
|
||||
mapMaybe)
|
||||
import Data.Maybe (catMaybes, fromJust, fromMaybe,
|
||||
isJust, mapMaybe)
|
||||
import qualified Data.Set as Set
|
||||
import Data.Time.Clock.POSIX
|
||||
import Data.Typeable (Typeable (..), typeOf)
|
||||
|
@ -181,11 +182,16 @@ cacheWriter nsSTM =
|
|||
modifyTVar' (nodeCacheSTM ns) cacheModifier
|
||||
|
||||
stabiliseThread :: LocalNodeStateSTM -> IO ()
|
||||
stabiliseThread nsSTM = do
|
||||
stabiliseThread nsSTM = forever $ do
|
||||
ns <- readTVarIO nsSTM
|
||||
-- TODO: update successfully stabilised nodes in cache
|
||||
-- placeholder
|
||||
stabiliseNeighbour nsSTM successors 1
|
||||
pure ()
|
||||
-- first stabilise immediate neihbours, then the next ones
|
||||
forM_ [1..(kNeighbours ns)] (\num -> do
|
||||
stabiliseNeighbour nsSTM predecessors num
|
||||
stabiliseNeighbour nsSTM successors num
|
||||
)
|
||||
-- TODO: make delay configurable
|
||||
threadDelay (60 * 1000)
|
||||
where
|
||||
stabiliseNeighbour :: LocalNodeStateSTM
|
||||
-> (LocalNodeState -> [RemoteNodeState])
|
||||
|
@ -207,10 +213,25 @@ stabiliseThread nsSTM = do
|
|||
-> m ()
|
||||
parseNeighbourResponse (Left err) = throwError err
|
||||
parseNeighbourResponse (Right (succs, preds)) = liftIO $ do
|
||||
-- ping each returned node before actually inserting them
|
||||
-- send pings in parallel, check whether ID is part of the returned IDs
|
||||
nsSnap <- readTVarIO nsSTM
|
||||
pingThreads <- mapM (async . checkReachability nsSnap) $ preds <> succs
|
||||
-- ToDo: exception handling, maybe log them
|
||||
-- filter out own node
|
||||
checkedNeighbours <- filter (/= toRemoteNodeState nsSnap) . catMaybes . rights <$> mapM waitCatch pingThreads
|
||||
|
||||
atomically $ do
|
||||
newerNsSnap <- readTVar nsSTM
|
||||
writeTVar nsSTM $ addPredecessors preds . addSuccessors succs $ newerNsSnap
|
||||
writeTVar nsSTM $ addPredecessors checkedNeighbours . addSuccessors checkedNeighbours $ newerNsSnap
|
||||
pure ()
|
||||
checkReachability :: LocalNodeState -> RemoteNodeState -> IO (Maybe RemoteNodeState)
|
||||
checkReachability ns toCheck = do
|
||||
resp <- requestPing ns toCheck
|
||||
pure $ either (const Nothing) (\vss ->
|
||||
if toCheck `elem` vss then Just toCheck else Nothing
|
||||
) resp
|
||||
|
||||
|
||||
-- periodically contact immediate successor and predecessor
|
||||
-- If they respond, see whether there is a closer neighbour in between
|
||||
|
|
|
@ -613,6 +613,7 @@ data FediChordConf = FediChordConf
|
|||
, confIP :: HostAddress6
|
||||
, confDhtPort :: Int
|
||||
, confBootstrapNodes :: [(String, PortNumber)]
|
||||
--, confStabiliseInterval :: Int
|
||||
}
|
||||
deriving (Show, Eq)
|
||||
|
||||
|
|
|
@ -154,7 +154,7 @@ spec = do
|
|||
describe "successors and predecessors do not disturb the ring characteristics of EpiChord operations (see #48)" $ do
|
||||
let
|
||||
emptyCache = initCache
|
||||
-- implicitly relies on kNieghbours to be <= 3
|
||||
-- implicitly relies on kNeighbours to be <= 3
|
||||
thisNid = toNodeID 1000
|
||||
thisNode = setNid thisNid <$> exampleLocalNode
|
||||
nid2 = toNodeID 1003
|
||||
|
|
Loading…
Reference in a new issue