Compare commits

...

3 commits

4 changed files with 38 additions and 24 deletions

View file

@ -203,14 +203,14 @@ queryIdLookupLoop cacheSnapshot ns targetID = do
now <- getPOSIXTime now <- getPOSIXTime
newLCache <- foldM (\oldCache resp -> do newLCache <- foldM (\oldCache resp -> do
let entriesToInsert = case queryResult <$> payload resp of let entriesToInsert = case queryResult <$> payload resp of
Just (FOUND result1) -> [addCacheEntryPure now (RemoteCacheEntry result1 now)] Just (FOUND result1) -> [RemoteCacheEntry result1 now]
Just (FORWARD resultset) -> addCacheEntryPure now <$> Set.elems resultset Just (FORWARD resultset) -> Set.elems resultset
_ -> [] _ -> []
-- forward entries to global cache -- forward entries to global cache
forM_ entriesToInsert $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) entry queueAddEntries entriesToInsert ns
-- insert entries into local cache copy -- insert entries into local cache copy
pure $ foldl' ( pure $ foldr' (
\oldLCache insertFunc -> insertFunc oldLCache addCacheEntryPure now
) oldCache entriesToInsert ) oldCache entriesToInsert
) cacheSnapshot responses ) cacheSnapshot responses
@ -283,7 +283,7 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache -- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
queueAddEntries :: [RemoteCacheEntry] queueAddEntries :: Foldable c => c RemoteCacheEntry
-> LocalNodeState -> LocalNodeState
-> IO () -> IO ()
queueAddEntries entries ns = do queueAddEntries entries ns = do
@ -334,4 +334,5 @@ mkSendSocket dest destPort = do
destAddr <- addrAddress <$> resolve (Just dest) (Just destPort) destAddr <- addrAddress <$> resolve (Just dest) (Just destPort)
sendSock <- socket AF_INET6 Datagram defaultProtocol sendSock <- socket AF_INET6 Datagram defaultProtocol
setSocketOption sendSock IPv6Only 1 setSocketOption sendSock IPv6Only 1
connect sendSock destAddr
pure sendSock pure sendSock

View file

@ -37,6 +37,8 @@ module Hash2Pub.FediChord (
, bsAsIpAddr , bsAsIpAddr
, FediChordConf(..) , FediChordConf(..)
, fediChordInit , fediChordInit
, fediChordJoin
, fediChordBootstrapJoin
, nodeStateInit , nodeStateInit
, mkServerSocket , mkServerSocket
, mkSendSocket , mkSendSocket
@ -109,11 +111,13 @@ nodeStateInit conf = do
} }
pure initialState pure initialState
fediChordJoin :: LocalNodeState -- ^ the local 'NodeState' -- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed
-> (String, PortNumber) -- ^ domain and port of a bootstrapping node -- for resolving the new node's position.
-> IO (Either String LocalNodeState) -- ^ the joined 'NodeState' after a fediChordBootstrapJoin :: LocalNodeState -- ^ the local 'NodeState'
-> (String, PortNumber) -- ^ domain and port of a bootstrapping node
-> IO (Either String LocalNodeState) -- ^ the joined 'NodeState' after a
-- successful join, otherwise an error message -- successful join, otherwise an error message
fediChordJoin ns (joinHost, joinPort) = do fediChordBootstrapJoin ns (joinHost, joinPort) = do
-- can be invoked multiple times with all known bootstrapping nodes until successfully joined -- can be invoked multiple times with all known bootstrapping nodes until successfully joined
sock <- mkSendSocket joinHost joinPort sock <- mkSendSocket joinHost joinPort
-- 1. get routed to placement of own ID until FOUND: -- 1. get routed to placement of own ID until FOUND:
@ -132,18 +136,24 @@ fediChordJoin ns (joinHost, joinPort) = do
Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
) )
initCache bootstrapResponse initCache bootstrapResponse
-- get routed to the currently responsible node, based on the response fediChordJoin bootstrapCache ns
-- from the bootstrapping node
currentlyResponsible <- queryIdLookupLoop bootstrapCache ns $ getNid ns
-- do actual join
joinResult <- requestJoin currentlyResponsible ns
case joinResult of
Nothing -> pure . Left $ "Error joining on " <> show currentlyResponsible
Just joinedNS -> pure . Right $ joinedNS
-- | join a node to the DHT, using the provided cache snapshot for resolving the new
-- node's position.
fediChordJoin :: NodeCache -- ^ a snapshot of the NodeCache to
-- use for ID lookup
-> LocalNodeState -- ^ the local 'NodeState'
-> IO (Either String LocalNodeState) -- ^ the joined 'NodeState' after a
-- successful join, otherwise an error message
fediChordJoin cacheSnapshot ns = do
-- get routed to the currently responsible node, based on the response
-- from the bootstrapping node
currentlyResponsible <- queryIdLookupLoop cacheSnapshot ns $ getNid ns
-- 2. then send a join to the currently responsible node -- 2. then send a join to the currently responsible node
-- after successful join, finally add own node to the cache joinResult <- requestJoin currentlyResponsible ns
case joinResult of
Nothing -> pure . Left $ "Error joining on " <> show currentlyResponsible
Just joinedNS -> pure . Right $ joinedNS
-- | cache updater thread that waits for incoming NodeCache update instructions on -- | cache updater thread that waits for incoming NodeCache update instructions on

View file

@ -419,9 +419,10 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs
-- | configuration values used for initialising the FediChord DHT -- | configuration values used for initialising the FediChord DHT
data FediChordConf = FediChordConf data FediChordConf = FediChordConf
{ confDomain :: String { confDomain :: String
, confIP :: HostAddress6 , confIP :: HostAddress6
, confDhtPort :: Int , confDhtPort :: Int
, confBootstrapNodes :: [(String, PortNumber)]
} }
deriving (Show, Eq) deriving (Show, Eq)

View file

@ -18,15 +18,17 @@ main = do
-- currently no masking is necessary, as there is nothing to clean up -- currently no masking is necessary, as there is nothing to clean up
cacheWriterThread <- forkIO $ cacheWriter thisNode cacheWriterThread <- forkIO $ cacheWriter thisNode
-- idea: list of bootstrapping nodes, try joining within a timeout -- idea: list of bootstrapping nodes, try joining within a timeout
joinedState <- fediChordBootstrapJoin thisNode $ head . confBootstrapNodes $ conf
-- stop main thread from terminating during development -- stop main thread from terminating during development
getChar getChar
pure () pure ()
readConfig :: IO FediChordConf readConfig :: IO FediChordConf
readConfig = do readConfig = do
confDomainString : ipString : portString : _ <- getArgs confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : _ <- getArgs
pure $ FediChordConf { pure $ FediChordConf {
confDomain = confDomainString confDomain = confDomainString
, confIP = toHostAddress6 . read $ ipString , confIP = toHostAddress6 . read $ ipString
, confDhtPort = read portString , confDhtPort = read portString
, confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)]
} }