diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 588e846..7b07785 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -203,14 +203,14 @@ queryIdLookupLoop cacheSnapshot ns targetID = do now <- getPOSIXTime newLCache <- foldM (\oldCache resp -> do let entriesToInsert = case queryResult <$> payload resp of - Just (FOUND result1) -> [RemoteCacheEntry result1 now] - Just (FORWARD resultset) -> Set.elems resultset + Just (FOUND result1) -> [addCacheEntryPure now (RemoteCacheEntry result1 now)] + Just (FORWARD resultset) -> addCacheEntryPure now <$> Set.elems resultset _ -> [] -- forward entries to global cache - queueAddEntries entriesToInsert ns + forM_ entriesToInsert $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) entry -- insert entries into local cache copy - pure $ foldr' ( - addCacheEntryPure now + pure $ foldl' ( + \oldLCache insertFunc -> insertFunc oldLCache ) oldCache entriesToInsert ) cacheSnapshot responses @@ -283,7 +283,7 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do -- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache -queueAddEntries :: Foldable c => c RemoteCacheEntry +queueAddEntries :: [RemoteCacheEntry] -> LocalNodeState -> IO () queueAddEntries entries ns = do @@ -334,5 +334,4 @@ mkSendSocket dest destPort = do destAddr <- addrAddress <$> resolve (Just dest) (Just destPort) sendSock <- socket AF_INET6 Datagram defaultProtocol setSocketOption sendSock IPv6Only 1 - connect sendSock destAddr pure sendSock diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 9cce7eb..540267c 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -37,8 +37,6 @@ module Hash2Pub.FediChord ( , bsAsIpAddr , FediChordConf(..) , fediChordInit - , fediChordJoin - , fediChordBootstrapJoin , nodeStateInit , mkServerSocket , mkSendSocket @@ -111,13 +109,11 @@ nodeStateInit conf = do } pure initialState --- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed --- for resolving the new node's position. -fediChordBootstrapJoin :: LocalNodeState -- ^ the local 'NodeState' - -> (String, PortNumber) -- ^ domain and port of a bootstrapping node - -> IO (Either String LocalNodeState) -- ^ the joined 'NodeState' after a +fediChordJoin :: LocalNodeState -- ^ the local 'NodeState' + -> (String, PortNumber) -- ^ domain and port of a bootstrapping node + -> IO (Either String LocalNodeState) -- ^ the joined 'NodeState' after a -- successful join, otherwise an error message -fediChordBootstrapJoin ns (joinHost, joinPort) = do +fediChordJoin ns (joinHost, joinPort) = do -- can be invoked multiple times with all known bootstrapping nodes until successfully joined sock <- mkSendSocket joinHost joinPort -- 1. get routed to placement of own ID until FOUND: @@ -136,24 +132,18 @@ fediChordBootstrapJoin ns (joinHost, joinPort) = do Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset ) initCache bootstrapResponse - fediChordJoin bootstrapCache ns + -- get routed to the currently responsible node, based on the response + -- from the bootstrapping node + currentlyResponsible <- queryIdLookupLoop bootstrapCache ns $ getNid ns + -- do actual join + joinResult <- requestJoin currentlyResponsible ns + case joinResult of + Nothing -> pure . Left $ "Error joining on " <> show currentlyResponsible + Just joinedNS -> pure . Right $ joinedNS + --- | join a node to the DHT, using the provided cache snapshot for resolving the new --- node's position. -fediChordJoin :: NodeCache -- ^ a snapshot of the NodeCache to - -- use for ID lookup - -> LocalNodeState -- ^ the local 'NodeState' - -> IO (Either String LocalNodeState) -- ^ the joined 'NodeState' after a - -- successful join, otherwise an error message -fediChordJoin cacheSnapshot ns = do - -- get routed to the currently responsible node, based on the response - -- from the bootstrapping node - currentlyResponsible <- queryIdLookupLoop cacheSnapshot ns $ getNid ns -- 2. then send a join to the currently responsible node - joinResult <- requestJoin currentlyResponsible ns - case joinResult of - Nothing -> pure . Left $ "Error joining on " <> show currentlyResponsible - Just joinedNS -> pure . Right $ joinedNS + -- after successful join, finally add own node to the cache -- | cache updater thread that waits for incoming NodeCache update instructions on diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index 7e3565d..7ad09a9 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -419,10 +419,9 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs -- | configuration values used for initialising the FediChord DHT data FediChordConf = FediChordConf - { confDomain :: String - , confIP :: HostAddress6 - , confDhtPort :: Int - , confBootstrapNodes :: [(String, PortNumber)] + { confDomain :: String + , confIP :: HostAddress6 + , confDhtPort :: Int } deriving (Show, Eq) diff --git a/src/Hash2Pub/Main.hs b/src/Hash2Pub/Main.hs index f1e6b29..1956f64 100644 --- a/src/Hash2Pub/Main.hs +++ b/src/Hash2Pub/Main.hs @@ -18,17 +18,15 @@ main = do -- currently no masking is necessary, as there is nothing to clean up cacheWriterThread <- forkIO $ cacheWriter thisNode -- idea: list of bootstrapping nodes, try joining within a timeout - joinedState <- fediChordBootstrapJoin thisNode $ head . confBootstrapNodes $ conf -- stop main thread from terminating during development getChar pure () readConfig :: IO FediChordConf readConfig = do - confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : _ <- getArgs + confDomainString : ipString : portString : _ <- getArgs pure $ FediChordConf { confDomain = confDomainString , confIP = toHostAddress6 . read $ ipString , confDhtPort = read portString - , confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)] }