Compare commits
2 commits
3482876d9b
...
d5f502c05c
Author | SHA1 | Date | |
---|---|---|---|
|
d5f502c05c | ||
|
0494ddd696 |
|
@ -55,4 +55,5 @@ readConfig = do
|
||||||
, confIP = toHostAddress6 . read $ ipString
|
, confIP = toHostAddress6 . read $ ipString
|
||||||
, confDhtPort = read portString
|
, confDhtPort = read portString
|
||||||
, confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)]
|
, confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)]
|
||||||
|
--, confStabiliseInterval = 60
|
||||||
}
|
}
|
||||||
|
|
|
@ -61,12 +61,13 @@ import Crypto.Hash
|
||||||
import qualified Data.ByteArray as BA
|
import qualified Data.ByteArray as BA
|
||||||
import qualified Data.ByteString as BS
|
import qualified Data.ByteString as BS
|
||||||
import qualified Data.ByteString.UTF8 as BSU
|
import qualified Data.ByteString.UTF8 as BSU
|
||||||
|
import Data.Either (rights)
|
||||||
import Data.Foldable (foldr')
|
import Data.Foldable (foldr')
|
||||||
import Data.IP (IPv6, fromHostAddress6,
|
import Data.IP (IPv6, fromHostAddress6,
|
||||||
toHostAddress6)
|
toHostAddress6)
|
||||||
import qualified Data.Map.Strict as Map
|
import qualified Data.Map.Strict as Map
|
||||||
import Data.Maybe (fromJust, fromMaybe, isJust,
|
import Data.Maybe (catMaybes, fromJust, fromMaybe,
|
||||||
mapMaybe)
|
isJust, mapMaybe)
|
||||||
import qualified Data.Set as Set
|
import qualified Data.Set as Set
|
||||||
import Data.Time.Clock.POSIX
|
import Data.Time.Clock.POSIX
|
||||||
import Data.Typeable (Typeable (..), typeOf)
|
import Data.Typeable (Typeable (..), typeOf)
|
||||||
|
@ -181,11 +182,16 @@ cacheWriter nsSTM =
|
||||||
modifyTVar' (nodeCacheSTM ns) cacheModifier
|
modifyTVar' (nodeCacheSTM ns) cacheModifier
|
||||||
|
|
||||||
stabiliseThread :: LocalNodeStateSTM -> IO ()
|
stabiliseThread :: LocalNodeStateSTM -> IO ()
|
||||||
stabiliseThread nsSTM = do
|
stabiliseThread nsSTM = forever $ do
|
||||||
|
ns <- readTVarIO nsSTM
|
||||||
-- TODO: update successfully stabilised nodes in cache
|
-- TODO: update successfully stabilised nodes in cache
|
||||||
-- placeholder
|
-- first stabilise immediate neihbours, then the next ones
|
||||||
stabiliseNeighbour nsSTM successors 1
|
forM_ [1..(kNeighbours ns)] (\num -> do
|
||||||
pure ()
|
stabiliseNeighbour nsSTM predecessors num
|
||||||
|
stabiliseNeighbour nsSTM successors num
|
||||||
|
)
|
||||||
|
-- TODO: make delay configurable
|
||||||
|
threadDelay (60 * 1000)
|
||||||
where
|
where
|
||||||
stabiliseNeighbour :: LocalNodeStateSTM
|
stabiliseNeighbour :: LocalNodeStateSTM
|
||||||
-> (LocalNodeState -> [RemoteNodeState])
|
-> (LocalNodeState -> [RemoteNodeState])
|
||||||
|
@ -207,10 +213,25 @@ stabiliseThread nsSTM = do
|
||||||
-> m ()
|
-> m ()
|
||||||
parseNeighbourResponse (Left err) = throwError err
|
parseNeighbourResponse (Left err) = throwError err
|
||||||
parseNeighbourResponse (Right (succs, preds)) = liftIO $ do
|
parseNeighbourResponse (Right (succs, preds)) = liftIO $ do
|
||||||
|
-- ping each returned node before actually inserting them
|
||||||
|
-- send pings in parallel, check whether ID is part of the returned IDs
|
||||||
|
nsSnap <- readTVarIO nsSTM
|
||||||
|
pingThreads <- mapM (async . checkReachability nsSnap) $ preds <> succs
|
||||||
|
-- ToDo: exception handling, maybe log them
|
||||||
|
-- filter out own node
|
||||||
|
checkedNeighbours <- filter (/= toRemoteNodeState nsSnap) . catMaybes . rights <$> mapM waitCatch pingThreads
|
||||||
|
|
||||||
atomically $ do
|
atomically $ do
|
||||||
newerNsSnap <- readTVar nsSTM
|
newerNsSnap <- readTVar nsSTM
|
||||||
writeTVar nsSTM $ addPredecessors preds . addSuccessors succs $ newerNsSnap
|
writeTVar nsSTM $ addPredecessors checkedNeighbours . addSuccessors checkedNeighbours $ newerNsSnap
|
||||||
pure ()
|
pure ()
|
||||||
|
checkReachability :: LocalNodeState -> RemoteNodeState -> IO (Maybe RemoteNodeState)
|
||||||
|
checkReachability ns toCheck = do
|
||||||
|
resp <- requestPing ns toCheck
|
||||||
|
pure $ either (const Nothing) (\vss ->
|
||||||
|
if toCheck `elem` vss then Just toCheck else Nothing
|
||||||
|
) resp
|
||||||
|
|
||||||
|
|
||||||
-- periodically contact immediate successor and predecessor
|
-- periodically contact immediate successor and predecessor
|
||||||
-- If they respond, see whether there is a closer neighbour in between
|
-- If they respond, see whether there is a closer neighbour in between
|
||||||
|
|
|
@ -613,6 +613,7 @@ data FediChordConf = FediChordConf
|
||||||
, confIP :: HostAddress6
|
, confIP :: HostAddress6
|
||||||
, confDhtPort :: Int
|
, confDhtPort :: Int
|
||||||
, confBootstrapNodes :: [(String, PortNumber)]
|
, confBootstrapNodes :: [(String, PortNumber)]
|
||||||
|
--, confStabiliseInterval :: Int
|
||||||
}
|
}
|
||||||
deriving (Show, Eq)
|
deriving (Show, Eq)
|
||||||
|
|
||||||
|
|
|
@ -154,7 +154,7 @@ spec = do
|
||||||
describe "successors and predecessors do not disturb the ring characteristics of EpiChord operations (see #48)" $ do
|
describe "successors and predecessors do not disturb the ring characteristics of EpiChord operations (see #48)" $ do
|
||||||
let
|
let
|
||||||
emptyCache = initCache
|
emptyCache = initCache
|
||||||
-- implicitly relies on kNieghbours to be <= 3
|
-- implicitly relies on kNeighbours to be <= 3
|
||||||
thisNid = toNodeID 1000
|
thisNid = toNodeID 1000
|
||||||
thisNode = setNid thisNid <$> exampleLocalNode
|
thisNode = setNid thisNid <$> exampleLocalNode
|
||||||
nid2 = toNodeID 1003
|
nid2 = toNodeID 1003
|
||||||
|
|
Loading…
Reference in a new issue