diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 7b07785..588e846 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -203,14 +203,14 @@ queryIdLookupLoop cacheSnapshot ns targetID = do now <- getPOSIXTime newLCache <- foldM (\oldCache resp -> do let entriesToInsert = case queryResult <$> payload resp of - Just (FOUND result1) -> [addCacheEntryPure now (RemoteCacheEntry result1 now)] - Just (FORWARD resultset) -> addCacheEntryPure now <$> Set.elems resultset + Just (FOUND result1) -> [RemoteCacheEntry result1 now] + Just (FORWARD resultset) -> Set.elems resultset _ -> [] -- forward entries to global cache - forM_ entriesToInsert $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) entry + queueAddEntries entriesToInsert ns -- insert entries into local cache copy - pure $ foldl' ( - \oldLCache insertFunc -> insertFunc oldLCache + pure $ foldr' ( + addCacheEntryPure now ) oldCache entriesToInsert ) cacheSnapshot responses @@ -283,7 +283,7 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do -- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache -queueAddEntries :: [RemoteCacheEntry] +queueAddEntries :: Foldable c => c RemoteCacheEntry -> LocalNodeState -> IO () queueAddEntries entries ns = do @@ -334,4 +334,5 @@ mkSendSocket dest destPort = do destAddr <- addrAddress <$> resolve (Just dest) (Just destPort) sendSock <- socket AF_INET6 Datagram defaultProtocol setSocketOption sendSock IPv6Only 1 + connect sendSock destAddr pure sendSock diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 540267c..9cce7eb 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -37,6 +37,8 @@ module Hash2Pub.FediChord ( , bsAsIpAddr , FediChordConf(..) , fediChordInit + , fediChordJoin + , fediChordBootstrapJoin , nodeStateInit , mkServerSocket , mkSendSocket @@ -109,11 +111,13 @@ nodeStateInit conf = do } pure initialState -fediChordJoin :: LocalNodeState -- ^ the local 'NodeState' - -> (String, PortNumber) -- ^ domain and port of a bootstrapping node - -> IO (Either String LocalNodeState) -- ^ the joined 'NodeState' after a +-- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed +-- for resolving the new node's position. +fediChordBootstrapJoin :: LocalNodeState -- ^ the local 'NodeState' + -> (String, PortNumber) -- ^ domain and port of a bootstrapping node + -> IO (Either String LocalNodeState) -- ^ the joined 'NodeState' after a -- successful join, otherwise an error message -fediChordJoin ns (joinHost, joinPort) = do +fediChordBootstrapJoin ns (joinHost, joinPort) = do -- can be invoked multiple times with all known bootstrapping nodes until successfully joined sock <- mkSendSocket joinHost joinPort -- 1. get routed to placement of own ID until FOUND: @@ -132,18 +136,24 @@ fediChordJoin ns (joinHost, joinPort) = do Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset ) initCache bootstrapResponse - -- get routed to the currently responsible node, based on the response - -- from the bootstrapping node - currentlyResponsible <- queryIdLookupLoop bootstrapCache ns $ getNid ns - -- do actual join - joinResult <- requestJoin currentlyResponsible ns - case joinResult of - Nothing -> pure . Left $ "Error joining on " <> show currentlyResponsible - Just joinedNS -> pure . Right $ joinedNS - + fediChordJoin bootstrapCache ns +-- | join a node to the DHT, using the provided cache snapshot for resolving the new +-- node's position. +fediChordJoin :: NodeCache -- ^ a snapshot of the NodeCache to + -- use for ID lookup + -> LocalNodeState -- ^ the local 'NodeState' + -> IO (Either String LocalNodeState) -- ^ the joined 'NodeState' after a + -- successful join, otherwise an error message +fediChordJoin cacheSnapshot ns = do + -- get routed to the currently responsible node, based on the response + -- from the bootstrapping node + currentlyResponsible <- queryIdLookupLoop cacheSnapshot ns $ getNid ns -- 2. then send a join to the currently responsible node - -- after successful join, finally add own node to the cache + joinResult <- requestJoin currentlyResponsible ns + case joinResult of + Nothing -> pure . Left $ "Error joining on " <> show currentlyResponsible + Just joinedNS -> pure . Right $ joinedNS -- | cache updater thread that waits for incoming NodeCache update instructions on diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index 7ad09a9..7e3565d 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -419,9 +419,10 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs -- | configuration values used for initialising the FediChord DHT data FediChordConf = FediChordConf - { confDomain :: String - , confIP :: HostAddress6 - , confDhtPort :: Int + { confDomain :: String + , confIP :: HostAddress6 + , confDhtPort :: Int + , confBootstrapNodes :: [(String, PortNumber)] } deriving (Show, Eq) diff --git a/src/Hash2Pub/Main.hs b/src/Hash2Pub/Main.hs index 1956f64..f1e6b29 100644 --- a/src/Hash2Pub/Main.hs +++ b/src/Hash2Pub/Main.hs @@ -18,15 +18,17 @@ main = do -- currently no masking is necessary, as there is nothing to clean up cacheWriterThread <- forkIO $ cacheWriter thisNode -- idea: list of bootstrapping nodes, try joining within a timeout + joinedState <- fediChordBootstrapJoin thisNode $ head . confBootstrapNodes $ conf -- stop main thread from terminating during development getChar pure () readConfig :: IO FediChordConf readConfig = do - confDomainString : ipString : portString : _ <- getArgs + confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : _ <- getArgs pure $ FediChordConf { confDomain = confDomainString , confIP = toHostAddress6 . read $ ipString , confDhtPort = read portString + , confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)] }