From ad1465c5fe50af1b9adf3db94b3f9a1c3725889d Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Tue, 26 May 2020 08:55:44 +0200 Subject: [PATCH 1/3] use global cache adding function --- src/Hash2Pub/DHTProtocol.hs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 7b07785..147fecc 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -203,14 +203,14 @@ queryIdLookupLoop cacheSnapshot ns targetID = do now <- getPOSIXTime newLCache <- foldM (\oldCache resp -> do let entriesToInsert = case queryResult <$> payload resp of - Just (FOUND result1) -> [addCacheEntryPure now (RemoteCacheEntry result1 now)] - Just (FORWARD resultset) -> addCacheEntryPure now <$> Set.elems resultset + Just (FOUND result1) -> [RemoteCacheEntry result1 now] + Just (FORWARD resultset) -> Set.elems resultset _ -> [] -- forward entries to global cache - forM_ entriesToInsert $ \entry -> atomically $ writeTQueue (cacheWriteQueue ns) entry + queueAddEntries entriesToInsert ns -- insert entries into local cache copy - pure $ foldl' ( - \oldLCache insertFunc -> insertFunc oldLCache + pure $ foldr' ( + addCacheEntryPure now ) oldCache entriesToInsert ) cacheSnapshot responses @@ -283,7 +283,7 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do -- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache -queueAddEntries :: [RemoteCacheEntry] +queueAddEntries :: Foldable c => c RemoteCacheEntry -> LocalNodeState -> IO () queueAddEntries entries ns = do From 43eb04dfea8a88f8764ee733234d403aff3c79c9 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Tue, 26 May 2020 09:38:38 +0200 Subject: [PATCH 2/3] preliminary passing of bootstrap nodes in Main to fediChordJoin --- src/Hash2Pub/DHTProtocol.hs | 1 + src/Hash2Pub/FediChord.hs | 1 + src/Hash2Pub/FediChordTypes.hs | 7 ++++--- src/Hash2Pub/Main.hs | 4 +++- 4 files changed, 9 insertions(+), 4 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 147fecc..588e846 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -334,4 +334,5 @@ mkSendSocket dest destPort = do destAddr <- addrAddress <$> resolve (Just dest) (Just destPort) sendSock <- socket AF_INET6 Datagram defaultProtocol setSocketOption sendSock IPv6Only 1 + connect sendSock destAddr pure sendSock diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 540267c..c52e9f9 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -37,6 +37,7 @@ module Hash2Pub.FediChord ( , bsAsIpAddr , FediChordConf(..) , fediChordInit + , fediChordJoin , nodeStateInit , mkServerSocket , mkSendSocket diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index 7ad09a9..7e3565d 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -419,9 +419,10 @@ byteStringToUInteger bs = sum $ parsedBytes 0 bs -- | configuration values used for initialising the FediChord DHT data FediChordConf = FediChordConf - { confDomain :: String - , confIP :: HostAddress6 - , confDhtPort :: Int + { confDomain :: String + , confIP :: HostAddress6 + , confDhtPort :: Int + , confBootstrapNodes :: [(String, PortNumber)] } deriving (Show, Eq) diff --git a/src/Hash2Pub/Main.hs b/src/Hash2Pub/Main.hs index 1956f64..4435f73 100644 --- a/src/Hash2Pub/Main.hs +++ b/src/Hash2Pub/Main.hs @@ -18,15 +18,17 @@ main = do -- currently no masking is necessary, as there is nothing to clean up cacheWriterThread <- forkIO $ cacheWriter thisNode -- idea: list of bootstrapping nodes, try joining within a timeout + joinedState <- fediChordJoin thisNode $ head . confBootstrapNodes $ conf -- stop main thread from terminating during development getChar pure () readConfig :: IO FediChordConf readConfig = do - confDomainString : ipString : portString : _ <- getArgs + confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : _ <- getArgs pure $ FediChordConf { confDomain = confDomainString , confIP = toHostAddress6 . read $ ipString , confDhtPort = read portString + , confBootstrapNodes = [(bootstrapHost, read bootstrapPortString)] } From 702684b1a955e1ae23ea59d4b3bc44fde9a8d8ee Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Tue, 26 May 2020 11:01:11 +0200 Subject: [PATCH 3/3] split fediChordJoin into general purpose and bootstrapping part --- src/Hash2Pub/FediChord.hs | 37 +++++++++++++++++++++++-------------- src/Hash2Pub/Main.hs | 2 +- 2 files changed, 24 insertions(+), 15 deletions(-) diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index c52e9f9..9cce7eb 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -38,6 +38,7 @@ module Hash2Pub.FediChord ( , FediChordConf(..) , fediChordInit , fediChordJoin + , fediChordBootstrapJoin , nodeStateInit , mkServerSocket , mkSendSocket @@ -110,11 +111,13 @@ nodeStateInit conf = do } pure initialState -fediChordJoin :: LocalNodeState -- ^ the local 'NodeState' - -> (String, PortNumber) -- ^ domain and port of a bootstrapping node - -> IO (Either String LocalNodeState) -- ^ the joined 'NodeState' after a +-- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed +-- for resolving the new node's position. +fediChordBootstrapJoin :: LocalNodeState -- ^ the local 'NodeState' + -> (String, PortNumber) -- ^ domain and port of a bootstrapping node + -> IO (Either String LocalNodeState) -- ^ the joined 'NodeState' after a -- successful join, otherwise an error message -fediChordJoin ns (joinHost, joinPort) = do +fediChordBootstrapJoin ns (joinHost, joinPort) = do -- can be invoked multiple times with all known bootstrapping nodes until successfully joined sock <- mkSendSocket joinHost joinPort -- 1. get routed to placement of own ID until FOUND: @@ -133,18 +136,24 @@ fediChordJoin ns (joinHost, joinPort) = do Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset ) initCache bootstrapResponse - -- get routed to the currently responsible node, based on the response - -- from the bootstrapping node - currentlyResponsible <- queryIdLookupLoop bootstrapCache ns $ getNid ns - -- do actual join - joinResult <- requestJoin currentlyResponsible ns - case joinResult of - Nothing -> pure . Left $ "Error joining on " <> show currentlyResponsible - Just joinedNS -> pure . Right $ joinedNS - + fediChordJoin bootstrapCache ns +-- | join a node to the DHT, using the provided cache snapshot for resolving the new +-- node's position. +fediChordJoin :: NodeCache -- ^ a snapshot of the NodeCache to + -- use for ID lookup + -> LocalNodeState -- ^ the local 'NodeState' + -> IO (Either String LocalNodeState) -- ^ the joined 'NodeState' after a + -- successful join, otherwise an error message +fediChordJoin cacheSnapshot ns = do + -- get routed to the currently responsible node, based on the response + -- from the bootstrapping node + currentlyResponsible <- queryIdLookupLoop cacheSnapshot ns $ getNid ns -- 2. then send a join to the currently responsible node - -- after successful join, finally add own node to the cache + joinResult <- requestJoin currentlyResponsible ns + case joinResult of + Nothing -> pure . Left $ "Error joining on " <> show currentlyResponsible + Just joinedNS -> pure . Right $ joinedNS -- | cache updater thread that waits for incoming NodeCache update instructions on diff --git a/src/Hash2Pub/Main.hs b/src/Hash2Pub/Main.hs index 4435f73..f1e6b29 100644 --- a/src/Hash2Pub/Main.hs +++ b/src/Hash2Pub/Main.hs @@ -18,7 +18,7 @@ main = do -- currently no masking is necessary, as there is nothing to clean up cacheWriterThread <- forkIO $ cacheWriter thisNode -- idea: list of bootstrapping nodes, try joining within a timeout - joinedState <- fediChordJoin thisNode $ head . confBootstrapNodes $ conf + joinedState <- fediChordBootstrapJoin thisNode $ head . confBootstrapNodes $ conf -- stop main thread from terminating during development getChar pure ()