Compare commits
	
		
			2 commits
		
	
	
		
			3482876d9b
			...
			d5f502c05c
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| d5f502c05c | |||
| 0494ddd696 | 
					 4 changed files with 31 additions and 8 deletions
				
			
		| 
						 | 
				
			
			@ -55,4 +55,5 @@ readConfig = do
 | 
			
		|||
      , confIP = toHostAddress6 . read $ ipString
 | 
			
		||||
      , confDhtPort = read portString
 | 
			
		||||
      , confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)]
 | 
			
		||||
      --, confStabiliseInterval = 60
 | 
			
		||||
                           }
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -61,12 +61,13 @@ import           Crypto.Hash
 | 
			
		|||
import qualified Data.ByteArray                as BA
 | 
			
		||||
import qualified Data.ByteString               as BS
 | 
			
		||||
import qualified Data.ByteString.UTF8          as BSU
 | 
			
		||||
import           Data.Either                   (rights)
 | 
			
		||||
import           Data.Foldable                 (foldr')
 | 
			
		||||
import           Data.IP                       (IPv6, fromHostAddress6,
 | 
			
		||||
                                                toHostAddress6)
 | 
			
		||||
import qualified Data.Map.Strict               as Map
 | 
			
		||||
import           Data.Maybe                    (fromJust, fromMaybe, isJust,
 | 
			
		||||
                                                mapMaybe)
 | 
			
		||||
import           Data.Maybe                    (catMaybes, fromJust, fromMaybe,
 | 
			
		||||
                                                isJust, mapMaybe)
 | 
			
		||||
import qualified Data.Set                      as Set
 | 
			
		||||
import           Data.Time.Clock.POSIX
 | 
			
		||||
import           Data.Typeable                 (Typeable (..), typeOf)
 | 
			
		||||
| 
						 | 
				
			
			@ -181,11 +182,16 @@ cacheWriter nsSTM =
 | 
			
		|||
        modifyTVar' (nodeCacheSTM ns) cacheModifier
 | 
			
		||||
 | 
			
		||||
stabiliseThread :: LocalNodeStateSTM -> IO ()
 | 
			
		||||
stabiliseThread nsSTM = do
 | 
			
		||||
stabiliseThread nsSTM = forever $ do
 | 
			
		||||
    ns <- readTVarIO nsSTM
 | 
			
		||||
    -- TODO: update successfully stabilised nodes in cache
 | 
			
		||||
    -- placeholder
 | 
			
		||||
    stabiliseNeighbour nsSTM successors 1
 | 
			
		||||
    pure ()
 | 
			
		||||
    -- first stabilise immediate neihbours, then the next ones
 | 
			
		||||
    forM_ [1..(kNeighbours ns)] (\num -> do
 | 
			
		||||
        stabiliseNeighbour nsSTM predecessors num
 | 
			
		||||
        stabiliseNeighbour nsSTM successors num
 | 
			
		||||
                               )
 | 
			
		||||
    -- TODO: make delay configurable
 | 
			
		||||
    threadDelay (60 * 1000)
 | 
			
		||||
  where
 | 
			
		||||
    stabiliseNeighbour :: LocalNodeStateSTM
 | 
			
		||||
                       -> (LocalNodeState -> [RemoteNodeState])
 | 
			
		||||
| 
						 | 
				
			
			@ -207,10 +213,25 @@ stabiliseThread nsSTM = do
 | 
			
		|||
                           -> m ()
 | 
			
		||||
    parseNeighbourResponse (Left err) = throwError err
 | 
			
		||||
    parseNeighbourResponse (Right (succs, preds)) = liftIO $ do
 | 
			
		||||
        -- ping each returned node before actually inserting them
 | 
			
		||||
        -- send pings in parallel, check whether ID is part of the returned IDs
 | 
			
		||||
        nsSnap <- readTVarIO nsSTM
 | 
			
		||||
        pingThreads <- mapM (async . checkReachability nsSnap) $ preds <> succs
 | 
			
		||||
        -- ToDo: exception handling, maybe log them
 | 
			
		||||
        -- filter out own node
 | 
			
		||||
        checkedNeighbours <- filter (/= toRemoteNodeState nsSnap) . catMaybes . rights <$> mapM waitCatch pingThreads
 | 
			
		||||
 | 
			
		||||
        atomically $ do
 | 
			
		||||
            newerNsSnap <- readTVar nsSTM
 | 
			
		||||
            writeTVar nsSTM $ addPredecessors preds . addSuccessors succs $ newerNsSnap
 | 
			
		||||
            writeTVar nsSTM $ addPredecessors checkedNeighbours . addSuccessors checkedNeighbours $ newerNsSnap
 | 
			
		||||
        pure ()
 | 
			
		||||
    checkReachability :: LocalNodeState -> RemoteNodeState -> IO (Maybe RemoteNodeState)
 | 
			
		||||
    checkReachability ns toCheck = do
 | 
			
		||||
        resp <- requestPing ns toCheck
 | 
			
		||||
        pure $ either (const Nothing) (\vss ->
 | 
			
		||||
            if toCheck `elem` vss then Just toCheck else Nothing
 | 
			
		||||
                       ) resp
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
-- periodically contact immediate successor and predecessor
 | 
			
		||||
-- If they respond, see whether there is a closer neighbour in between
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -613,6 +613,7 @@ data FediChordConf = FediChordConf
 | 
			
		|||
    , confIP             :: HostAddress6
 | 
			
		||||
    , confDhtPort        :: Int
 | 
			
		||||
    , confBootstrapNodes :: [(String, PortNumber)]
 | 
			
		||||
    --, confStabiliseInterval :: Int
 | 
			
		||||
    }
 | 
			
		||||
    deriving (Show, Eq)
 | 
			
		||||
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
| 
						 | 
				
			
			@ -154,7 +154,7 @@ spec = do
 | 
			
		|||
    describe "successors and predecessors do not disturb the ring characteristics of EpiChord operations (see #48)" $ do
 | 
			
		||||
        let
 | 
			
		||||
            emptyCache = initCache
 | 
			
		||||
            -- implicitly relies on kNieghbours to be <= 3
 | 
			
		||||
            -- implicitly relies on kNeighbours to be <= 3
 | 
			
		||||
            thisNid = toNodeID 1000
 | 
			
		||||
            thisNode = setNid thisNid <$> exampleLocalNode
 | 
			
		||||
            nid2 = toNodeID 1003
 | 
			
		||||
| 
						 | 
				
			
			
 | 
			
		|||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue