From 2ee40a7f64c89433996db458a7571140b530a1fd Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Wed, 19 Aug 2020 15:49:39 +0200 Subject: [PATCH 01/18] start working on the experiment runner #59 --- Hash2Pub.cabal | 16 +++++++++++++++- app/Experiment.hs | 3 +++ 2 files changed, 18 insertions(+), 1 deletion(-) create mode 100644 app/Experiment.hs diff --git a/Hash2Pub.cabal b/Hash2Pub.cabal index 5ffff0d..2cc2d84 100644 --- a/Hash2Pub.cabal +++ b/Hash2Pub.cabal @@ -46,7 +46,7 @@ category: Network extra-source-files: CHANGELOG.md common deps - build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types + build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays ghc-options: -Wall @@ -93,6 +93,20 @@ executable Hash2Pub ghc-options: -threaded +executable Experiment + -- experiment runner + import: deps + + build-depends: Hash2Pub + + main-is: Experiment.hs + + hs-source-dirs: app + + default-language: Haskell2010 + + ghc-options: -threaded + test-suite Hash2Pub-test -- Test dependencies. diff --git a/app/Experiment.hs b/app/Experiment.hs new file mode 100644 index 0000000..c7abbcb --- /dev/null +++ b/app/Experiment.hs @@ -0,0 +1,3 @@ +module Main where + +main = putStrLn "This gives us ALL the insights!" From 2548b6a507c249462c68a519285c79d17429c344 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Thu, 20 Aug 2020 11:49:23 +0200 Subject: [PATCH 02/18] automatically subscribe when publishing to a tag --- src/Hash2Pub/PostService.hs | 26 +++++++++++++++++++------- 1 file changed, 19 insertions(+), 7 deletions(-) diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index a871343..0eb6e00 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -17,7 +17,7 @@ import Control.Concurrent.STM.TChan import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TVar import Control.Exception (Exception (..), try) -import Control.Monad (foldM, forM, forM_, forever) +import Control.Monad (foldM, forM, forM_, forever, when, void) import Control.Monad.IO.Class (liftIO) import Control.Monad.STM import Data.Bifunctor @@ -150,7 +150,7 @@ placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB -- ========= HTTP API and handlers ============= type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent - -- delivery endpoint of newly published posts of the relay's instance + -- delivery endpoint at responsible relay for delivering posts of $tag for distribution :<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text -- endpoint for delivering the subscriptions and outstanding queue :<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text @@ -191,7 +191,7 @@ relayInbox serv tag posts = do if responsible then pure () else - (throwError $ err410 { errBody = "Relay is not responsible for this tag"}) + throwError $ err410 { errBody = "Relay is not responsible for this tag"} broadcastChan <- liftIO $ atomically $ getTagBroadcastChannel serv tag maybe -- if noone subscribed to the tag, nothing needs to be done @@ -396,7 +396,8 @@ clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do maybe (pure acc) (\x -> channelGetAll' chan (x:acc)) haveRead --- | Subscribe the client to the given hashtag. On success it returns the given lease time. +-- | Subscribe the client to the given hashtag. On success it returns the given lease time, +-- but also records the subscription in its own data structure. clientSubscribeTo :: DHT d => PostService d -> Hashtag -> IO (Either String Integer) clientSubscribeTo serv tag = do lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag) @@ -413,7 +414,9 @@ clientSubscribeTo serv tag = do newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag) doSubscribe newRes False Left err -> pure . Left . show $ err - Right lease -> pure . Right $ lease + Right lease -> do + atomically . modifyTVar' (ownSubscriptions serv) $ HMap.insert (genKeyID . Txt.unpack $ tag) (fromInteger lease) + pure . Right $ lease ) lookupResponse @@ -435,7 +438,9 @@ clientUnsubscribeFrom serv tag = do newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag) doUnsubscribe newRes False Left err -> pure . Left . show $ err - Right _ -> pure . Right $ () + Right _ -> do + atomically . modifyTVar' (ownSubscriptions serv) $ HMap.delete (genKeyID . Txt.unpack $ tag) + pure . Right $ () ) lookupResponse @@ -580,7 +585,14 @@ processIncomingPosts serv = forever $ do -- TODO: keep track of maximum retries _ <- forceLookupKey (baseDHT serv) (Txt.unpack tag) atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent) - Right yay -> putStrLn $ "Yay! " <> show yay + Right yay -> do + putStrLn $ "Yay! " <> show yay + -- idea for the experiment: each post publication makes the initial posting instance subscribe to all contained tags + now <- getPOSIXTime + subscriptionStatus <- HMap.lookup (genKeyID . Txt.unpack $ tag) <$> readTVarIO (ownSubscriptions serv) + -- if not yet subscribed or subscription expires within 2 minutes, (re)subscribe to tag + when (maybe False (\subLease -> now - subLease < 120) subscriptionStatus) $ + void $ clientSubscribeTo serv tag -- | process the pending fetch jobs of delivered post IDs: Delivered posts are tried to be fetched from their URI-ID From 24088581fee3bc609f82811b380c961a4e0624e2 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Thu, 20 Aug 2020 15:58:35 +0200 Subject: [PATCH 03/18] bump nixpkgs revision --- default.nix | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/default.nix b/default.nix index 4e77a05..cea4aa3 100644 --- a/default.nix +++ b/default.nix @@ -14,7 +14,7 @@ let name = "nixpkgs-pinned"; url = https://github.com/NixOS/nixpkgs/; ref = "refs/heads/release-20.03"; - rev = "076c67fdea6d0529a568c7d0e0a72e6bc161ecf5"; + rev = "de3780b937d2984f9b5e20d191f23be4f857b3aa"; }) { # Pass no config for purity config = {}; From 32734102cdbd23ca5e15d682f9c6b73ad7ac9018 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Thu, 20 Aug 2020 18:13:50 +0200 Subject: [PATCH 04/18] improve documentation of clientPublishPost --- src/Hash2Pub/PostService.hs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 0eb6e00..2e107b6 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -450,11 +450,11 @@ clientUnsubscribeFrom serv tag = do -- the post to the responsible relays. -- As the initial publishing isn't done by a specific relay (but *to* a specific relay -- instead), the function does *not* take a PostService as argument. -clientPublishPost :: HTTP.Manager -- for better performance, a shared HTTP manager has to be provided - -> String -- hostname - -> Int -- port - -> PostContent -- post content - -> IO (Either String ()) -- error or success +clientPublishPost :: HTTP.Manager -- ^ for better performance, a shared HTTP manager has to be provided + -> String -- ^ hostname + -> Int -- ^ port + -> PostContent -- ^ post content + -> IO (Either String ()) -- ^ error or success clientPublishPost httpman hostname port postC = do resp <- runClientM (postInboxClient postC) (mkClientEnv httpman (BaseUrl Http hostname port "")) pure . bimap show (const ()) $ resp From f330ff1070580d042d1bc15e52be3fac4f59c601 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Thu, 20 Aug 2020 18:14:23 +0200 Subject: [PATCH 05/18] successful post publishing with MonadState and random relay selection --- app/Experiment.hs | 43 ++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 42 insertions(+), 1 deletion(-) diff --git a/app/Experiment.hs b/app/Experiment.hs index c7abbcb..51b8e88 100644 --- a/app/Experiment.hs +++ b/app/Experiment.hs @@ -1,3 +1,44 @@ +{-# LANGUAGE OverloadedStrings #-} + module Main where -main = putStrLn "This gives us ALL the insights!" +import System.Random +import Control.Concurrent +import Control.Monad (forM_) +import Control.Monad.State.Class +import Control.Monad.State.Strict (evalStateT) +import Control.Monad.IO.Class +import qualified Network.HTTP.Client as HTTP + +import Hash2Pub.PostService (clientPublishPost, Hashtag) + +-- placeholder post data definition + +tagsToPostTo = [ "JustSomeTag", "WantAnotherTag234", "HereWeGoAgain", "Oyä", "通信端末" ] + +knownRelays :: [(String, Int)] +knownRelays = + [ ("animalliberation.social", 3342) + , ("hostux.social", 3343) + , ("social.diskseven.com", 3344) + , ("social.imirhil.fr", 3345) + ] + +main :: IO () +main = do + -- initialise HTTP manager + httpMan <- HTTP.newManager HTTP.defaultManagerSettings + -- initialise RNG + let initRGen = mkStdGen 12 + -- cycle through tags and post to a random instance + evalStateT (forM_ (cycle tagsToPostTo) $ publishPostRandom httpMan) initRGen + -- wait for a specified time + +publishPostRandom :: (RandomGen g, MonadIO m, MonadState g m) => HTTP.Manager -> Hashtag -> m () +publishPostRandom httpman tag = do + index <- state $ randomR (0, length knownRelays - 1) + let (pubHost, pubPort) = knownRelays !! index + _ <- liftIO . forkIO $ do + postResult <- liftIO $ clientPublishPost httpman pubHost pubPort ("foobar #" <> tag) + either putStrLn (const $ pure ()) postResult + liftIO $ threadDelay 500 From 5511026c8deef79bc8dd5c3377ffd191c6742cea Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Fri, 21 Aug 2020 14:40:29 +0200 Subject: [PATCH 06/18] reduce logging verbosity --- src/Hash2Pub/DHTProtocol.hs | 3 --- src/Hash2Pub/FediChord.hs | 2 +- src/Hash2Pub/PostService.hs | 4 ++-- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index bd7953f..033f248 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -259,7 +259,6 @@ handleIncomingRequest :: Service s (RealNodeSTM s) -> SockAddr -- ^ source address of the request -> IO () handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do - putStrLn $ "handling incoming request: " <> show msgSet ns <- readTVarIO nsSTM -- add nodestate to cache now <- getPOSIXTime @@ -314,7 +313,6 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do -- | execute a key ID lookup on local cache and respond with the result respondQueryID :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondQueryID nsSTM msgSet = do - putStrLn "responding to a QueryID request" -- this message cannot be split reasonably, so just -- consider the first payload let @@ -749,7 +747,6 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do let msgComplete = msgIncomplete randomID requests = serialiseMessage sendMessageSize msgComplete - putStrLn $ "sending request message " <> show msgComplete -- create a queue for passing received response messages back, even after a timeout responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets -- start sendAndAck with timeout diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 15563de..2116ca9 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -716,7 +716,7 @@ fediMessageHandler sendQ recvQ nsSTM = do instance DHT (RealNodeSTM s) where lookupKey nodeSTM keystring = getKeyResponsibility nodeSTM $ genKeyID keystring - forceLookupKey nodeSTM keystring = updateLookupCache nodeSTM $ genKeyID keystring + forceLookupKey nodeSTM keystring = (putStrLn $ "forced responsibility lookup of #" <> keystring) >> (updateLookupCache nodeSTM $ genKeyID keystring) -- potential better implementation: put all neighbours of all vservers and the vservers on a ringMap, look the key up and see whether it results in a LocalNodeState isResponsibleFor nodeSTM key = do node <- readTVarIO nodeSTM diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 2e107b6..348c9a1 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -585,8 +585,8 @@ processIncomingPosts serv = forever $ do -- TODO: keep track of maximum retries _ <- forceLookupKey (baseDHT serv) (Txt.unpack tag) atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent) - Right yay -> do - putStrLn $ "Yay! " <> show yay + Right _ -> do + -- TODO: stats -- idea for the experiment: each post publication makes the initial posting instance subscribe to all contained tags now <- getPOSIXTime subscriptionStatus <- HMap.lookup (genKeyID . Txt.unpack $ tag) <$> readTVarIO (ownSubscriptions serv) From 75c1932ef67a38adbeaa0c9c32172b080c0b78fa Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Fri, 21 Aug 2020 23:47:42 +0200 Subject: [PATCH 07/18] send fetchable post URIs as ID --- src/Hash2Pub/PostService.hs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 348c9a1..c7300db 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -571,12 +571,13 @@ processIncomingPosts serv = forever $ do -- blocks until available -- TODO: process multiple in parallel (tag, pID, pContent) <- atomically . readTQueue $ relayInQueue serv + let pIdUri = "http://" <> (Txt.pack . confServiceHost . serviceConf $ serv) <> ":" <> (fromString . show . confServicePort . serviceConf $ serv) <> "/post/" <> pID lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag) case lookupRes of -- no vserver active => wait and retry Nothing -> threadDelay $ 10 * 10^6 Just (responsibleHost, responsiblePort) -> do - resp <- runClientM (relayInboxClient tag $ pID <> "," <> pContent) (mkClientEnv (httpMan serv) (BaseUrl Http responsibleHost (fromIntegral responsiblePort) "")) + resp <- runClientM (relayInboxClient tag $ pIdUri <> "," <> pContent) (mkClientEnv (httpMan serv) (BaseUrl Http responsibleHost (fromIntegral responsiblePort) "")) case resp of Left err -> do putStrLn $ "Error: " <> show err From c3b1aad1c76f54ef471d5edfed57abb2f6e8fd2b Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Fri, 21 Aug 2020 23:55:20 +0200 Subject: [PATCH 08/18] abstract away the hashtag -> NodeID conversion --- src/Hash2Pub/PostService.hs | 29 +++++++++++++++++------------ 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index c7300db..fe013a0 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -187,7 +187,7 @@ relayInbox serv tag posts = do -- skip checking whether the post actually contains the tag, just drop full post postIDs = head . Txt.splitOn "," <$> Txt.lines posts -- if tag is not in own responsibility, return a 410 Gone - responsible <- liftIO $ isResponsibleFor (baseDHT serv) (genKeyID . Txt.unpack $ tag) + responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId tag) if responsible then pure () else @@ -221,7 +221,7 @@ subscriptionDelivery serv senderID subList = do -- not-handled tag occurs, this results in a single large transaction. -- Hopefully the performance isn't too bad. res <- liftIO . atomically $ (foldM (\_ tag' -> do - responsible <- isResponsibleForSTM (baseDHT serv) (genKeyID . Txt.unpack $ tag') + responsible <- isResponsibleForSTM (baseDHT serv) (hashtagToId tag') if responsible then processTag (subscribers serv) tag' else throwSTM $ UnhandledTagException (Txt.unpack tag' <> " not handled by this relay") @@ -295,7 +295,7 @@ tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text tagDelivery serv hashtag posts = do let postIDs = Txt.lines posts subscriptions <- liftIO . readTVarIO . ownSubscriptions $ serv - if isJust (HMap.lookup (genKeyID . Txt.unpack $ hashtag) subscriptions) + if isJust (HMap.lookup (hashtagToId hashtag) subscriptions) then -- TODO: increase a counter/ statistics for received posts of this tag liftIO $ forM_ postIDs $ atomically . writeTQueue (postFetchQueue serv) else -- silently drop posts from unsubscribed tags @@ -304,7 +304,7 @@ tagDelivery serv hashtag posts = do tagSubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer tagSubscribe serv hashtag origin = do - responsible <- liftIO $ isResponsibleFor (baseDHT serv) (genKeyID . Txt.unpack $ hashtag) + responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag) if not responsible -- GONE if not responsible then throwError err410 { errBody = "not responsible for this tag" } @@ -323,7 +323,7 @@ tagSubscribe serv hashtag origin = do tagUnsubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Text tagUnsubscribe serv hashtag origin = do - responsible <- liftIO $ isResponsibleFor (baseDHT serv) (genKeyID . Txt.unpack $ hashtag) + responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag) if not responsible -- GONE if not responsible then throwError err410 { errBody = "not responsible for this tag" } @@ -385,7 +385,7 @@ clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do Right _ -> do atomically $ modifyTVar' (subscribers serv) $ \tagMap -> - foldr deleteRMapEntry tagMap ((\(_, _, t) -> genKeyID . Txt.unpack $ t) <$> intervalTags) + foldr deleteRMapEntry tagMap ((\(_, _, t) -> hashtagToId t) <$> intervalTags) pure . Right $ () where channelGetAll :: TChan a -> STM [a] @@ -415,7 +415,7 @@ clientSubscribeTo serv tag = do doSubscribe newRes False Left err -> pure . Left . show $ err Right lease -> do - atomically . modifyTVar' (ownSubscriptions serv) $ HMap.insert (genKeyID . Txt.unpack $ tag) (fromInteger lease) + atomically . modifyTVar' (ownSubscriptions serv) $ HMap.insert (hashtagToId tag) (fromInteger lease) pure . Right $ lease ) lookupResponse @@ -439,7 +439,7 @@ clientUnsubscribeFrom serv tag = do doUnsubscribe newRes False Left err -> pure . Left . show $ err Right _ -> do - atomically . modifyTVar' (ownSubscriptions serv) $ HMap.delete (genKeyID . Txt.unpack $ tag) + atomically . modifyTVar' (ownSubscriptions serv) $ HMap.delete (hashtagToId tag) pure . Right $ () ) lookupResponse @@ -497,7 +497,7 @@ setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do broadcastChan <- newBroadcastTChan tagOutChan <- dupTChan broadcastChan newSubMapSTM <- newTVar $ HMap.singleton subscriber (tagOutChan, leaseTime) - writeTVar tagMapSTM $ addRMapEntry (genKeyID . Txt.unpack $ tag) (newSubMapSTM, broadcastChan, tag) tagMap + writeTVar tagMapSTM $ addRMapEntry (hashtagToId tag) (newSubMapSTM, broadcastChan, tag) tagMap pure tagOutChan Just (foundSubMapSTM, broadcastChan, _) -> do -- otherwise use the existing subscriber map @@ -525,7 +525,7 @@ deleteSubscription tagMapSTM tag subscriber = do -- if there are no subscriptions for the tag anymore, remove its -- data sttructure altogether if HMap.null newSubMap - then writeTVar tagMapSTM $ deleteRMapEntry (genKeyID . Txt.unpack $ tag) tagMap + then writeTVar tagMapSTM $ deleteRMapEntry (hashtagToId tag) tagMap -- otherwise just remove the subscription of that node else writeTVar foundSubMapSTM newSubMap @@ -546,13 +546,18 @@ getTagBroadcastChannel serv tag = do -- | look up the subscription data of a tag lookupTagSubscriptions :: Hashtag -> RingMap NodeID a -> Maybe a -lookupTagSubscriptions tag = rMapLookup (genKeyID . Txt.unpack $ tag) +lookupTagSubscriptions tag = rMapLookup (hashtagToId tag) -- normalise the unicode representation of a string to NFC normaliseTag :: Txt.Text -> Txt.Text normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict + +-- | convert a hashtag to its representation on the DHT +hashtagToId :: Hashtag -> NodeID +hashtagToId = genKeyID . Txt.unpack + -- | define how to convert all showable types to PlainText -- No idea what I'm doing with these overlappable instances though ¯\_(ツ)_/¯ -- TODO: figure out how this overlapping stuff actually works https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/glasgow_exts.html#instance-overlap @@ -590,7 +595,7 @@ processIncomingPosts serv = forever $ do -- TODO: stats -- idea for the experiment: each post publication makes the initial posting instance subscribe to all contained tags now <- getPOSIXTime - subscriptionStatus <- HMap.lookup (genKeyID . Txt.unpack $ tag) <$> readTVarIO (ownSubscriptions serv) + subscriptionStatus <- HMap.lookup (hashtagToId tag) <$> readTVarIO (ownSubscriptions serv) -- if not yet subscribed or subscription expires within 2 minutes, (re)subscribe to tag when (maybe False (\subLease -> now - subLease < 120) subscriptionStatus) $ void $ clientSubscribeTo serv tag From 2b418189a61137ec29dfa8bc1657b95f4f93d47f Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sun, 23 Aug 2020 12:06:26 +0200 Subject: [PATCH 09/18] use hard-coded defaults for DHT request timeout and retries --- src/Hash2Pub/DHTProtocol.hs | 29 ++++++++++++++++++----------- src/Hash2Pub/FediChord.hs | 2 +- 2 files changed, 19 insertions(+), 12 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 033f248..a194dbb 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -490,7 +490,7 @@ requestJoin toJoinOn ownStateSTM = do srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ownState) bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do -- extract own state for getting request information - responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock + responses <- sendRequestTo (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock (cacheInsertQ, joinedState) <- atomically $ do stateSnap <- readTVar ownStateSTM let @@ -586,7 +586,7 @@ sendQueryIdMessages targetID ns lParam targets = do srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) -- ToDo: make attempts and timeout configurable queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close ( - sendRequestTo 5000 3 (lookupMessage targetID ns Nothing) + sendRequestTo (lookupMessage targetID ns Nothing) )) targets -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 -- ToDo: exception handling, maybe log them @@ -628,7 +628,7 @@ requestStabilise :: LocalNodeState s -- ^ sending node -> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node requestStabilise ns neighbour = do srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) - responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo 5000 3 (\rid -> + responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (\rid -> Request { requestID = rid , sender = toRemoteNodeState ns @@ -672,7 +672,7 @@ requestLeave ns doMigration target = do , leavePredecessors = predecessors ns , leaveDoMigration = doMigration } - responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo 5000 3 (\rid -> + responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (\rid -> Request { requestID = rid , sender = toRemoteNodeState ns @@ -697,7 +697,7 @@ requestPing ns target = do srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (\sock -> do - resp <- sendRequestTo 5000 3 (\rid -> + resp <- sendRequestTo (\rid -> Request { requestID = rid , sender = toRemoteNodeState ns @@ -733,15 +733,22 @@ requestPing ns target = do ) responses +-- | 'sendRequestToWithParams' with default timeout and retries already specified. +-- Generic function for sending a request over a connected socket and collecting the response. +-- Serialises the message and tries to deliver its parts for a number of attempts within a default timeout. +sendRequestTo :: (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID + -> Socket -- ^ connected socket to use for sending + -> IO (Set.Set FediChordMessage) -- ^ responses +sendRequestTo = sendRequestToWithParams 5000 3 -- | Generic function for sending a request over a connected socket and collecting the response. -- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout. -sendRequestTo :: Int -- ^ timeout in seconds - -> Int -- ^ number of retries - -> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID - -> Socket -- ^ connected socket to use for sending - -> IO (Set.Set FediChordMessage) -- ^ responses -sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do +sendRequestToWithParams :: Int -- ^ timeout in seconds + -> Int -- ^ number of retries + -> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID + -> Socket -- ^ connected socket to use for sending + -> IO (Set.Set FediChordMessage) -- ^ responses +sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do -- give the message a random request ID randomID <- randomRIO (0, 2^32-1) let diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 2116ca9..54c5e9a 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -225,7 +225,7 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) bootstrapResponse <- bracket (mkSendSocket srcAddr bootstrapHost bootstrapPort) close ( -- Initialise an empty cache only with the responses from a bootstrapping node - fmap Right . sendRequestTo 5000 3 (lookupMessage targetID ns Nothing) + fmap Right . sendRequestTo (lookupMessage targetID ns Nothing) ) `catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException)) From cd8ea0760007602e58e3d4b6aff253fec5d235a9 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sun, 23 Aug 2020 13:04:58 +0200 Subject: [PATCH 10/18] bugfix: make unjoined nodes consider all IDs to be their responsibility --- src/Hash2Pub/DHTProtocol.hs | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index a194dbb..69995bd 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -106,9 +106,6 @@ queryLocalCache ownState nCache lBestNodes targetID -- the closest succeeding node (like with the p initiated parallel queries | otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache where - ownID = getNid ownState - preds = predecessors ownState - closestSuccessor :: Set.Set RemoteCacheEntry closestSuccessor = maybe Set.empty (Set.singleton . toRemoteCacheEntry) $ cacheLookupSucc targetID nCache @@ -433,7 +430,9 @@ respondJoin nsSTM msgSet = do let aRequestPart = Set.elemAt 0 msgSet senderNS = sender aRequestPart - responsibilityLookup = queryLocalCache nsSnap cache 1 (getNid senderNS) + -- if not joined yet, attract responsibility for + -- all keys to make bootstrapping possible + responsibilityLookup = if isJoined nsSnap then queryLocalCache nsSnap cache 1 (getNid senderNS) else FOUND (toRemoteNodeState nsSnap) thisNodeResponsible (FOUND _) = True thisNodeResponsible (FORWARD _) = False -- check whether the joining node falls into our responsibility From 4ba592d8a2495d845a8af25a8ad44b95b3e00072 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sun, 23 Aug 2020 15:21:24 +0200 Subject: [PATCH 11/18] bugfix: DHT request timeout unit is milliseconds --- src/Hash2Pub/DHTProtocol.hs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 69995bd..b309b39 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -756,7 +756,7 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do -- create a queue for passing received response messages back, even after a timeout responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets -- start sendAndAck with timeout - attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests + attempts numAttempts . timeout (timeoutMillis*1000) $ sendAndAck responseQ sock requests -- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary. recvdParts <- atomically $ flushTBQueue responseQ pure $ Set.fromList recvdParts From 3bd4cb667db2ee4c94c2537e958cfd29e6819f72 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Mon, 24 Aug 2020 10:02:45 +0200 Subject: [PATCH 12/18] explicitly pass socket in send-receive-loop --- src/Hash2Pub/DHTProtocol.hs | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index b309b39..e22834a 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -742,7 +742,7 @@ sendRequestTo = sendRequestToWithParams 5000 3 -- | Generic function for sending a request over a connected socket and collecting the response. -- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout. -sendRequestToWithParams :: Int -- ^ timeout in seconds +sendRequestToWithParams :: Int -- ^ timeout in milliseconds -> Int -- ^ number of retries -> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID -> Socket -- ^ connected socket to use for sending @@ -765,19 +765,20 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do -> Socket -- ^ the socket used for sending and receiving for this particular remote node -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts -> IO () - sendAndAck responseQueue sock remainingSends = do + sendAndAck responseQueue sock' remainingSends = do sendMany sock $ Map.elems remainingSends -- if all requests have been acked/ responded to, return prematurely - recvLoop responseQueue remainingSends Set.empty Nothing - recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses + recvLoop sock' responseQueue remainingSends Set.empty Nothing + recvLoop :: Socket + -> TBQueue FediChordMessage -- ^ the queue for putting in the received responses -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts -> Set.Set Integer -- ^ already received response part numbers -> Maybe Integer -- ^ total number of response parts if already known -> IO () - recvLoop responseQueue remainingSends' receivedPartNums totalParts = do + recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts = do -- 65535 is maximum length of UDP packets, as long as -- no IPv6 jumbograms are used - response <- deserialiseMessage <$> recv sock 65535 + response <- deserialiseMessage <$> recv sock' 65535 case response of Right msg@Response{} -> do atomically $ writeTBQueue responseQueue msg @@ -787,9 +788,9 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do newReceivedParts = Set.insert (part msg) receivedPartNums if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts then pure () - else recvLoop responseQueue newRemaining receivedPartNums newTotalParts + else recvLoop sock' responseQueue newRemaining receivedPartNums newTotalParts -- drop errors and invalid messages - Left _ -> recvLoop responseQueue remainingSends' receivedPartNums totalParts + Left _ -> recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts -- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache From 6c5e40f8ad78cc9019a9001cdb2a522180eaa0bd Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Mon, 24 Aug 2020 15:28:06 +0200 Subject: [PATCH 13/18] fix wrong passing of arguments in receive-loop part checking --- src/Hash2Pub/DHTProtocol.hs | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index e22834a..fac5a3f 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -766,14 +766,14 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts -> IO () sendAndAck responseQueue sock' remainingSends = do - sendMany sock $ Map.elems remainingSends + sendMany sock' $ Map.elems remainingSends -- if all requests have been acked/ responded to, return prematurely recvLoop sock' responseQueue remainingSends Set.empty Nothing recvLoop :: Socket -> TBQueue FediChordMessage -- ^ the queue for putting in the received responses -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts -> Set.Set Integer -- ^ already received response part numbers - -> Maybe Integer -- ^ total number of response parts if already known + -> Maybe Integer -- ^ total number of response parts if already known -> IO () recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts = do -- 65535 is maximum length of UDP packets, as long as @@ -786,10 +786,11 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do newTotalParts = if isFinalPart msg then Just (part msg) else totalParts newRemaining = Map.delete (part msg) remainingSends' newReceivedParts = Set.insert (part msg) receivedPartNums - if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts + if Map.null newRemaining && maybe False (\p -> Set.size newReceivedParts == fromIntegral p) newTotalParts then pure () - else recvLoop sock' responseQueue newRemaining receivedPartNums newTotalParts + else recvLoop sock' responseQueue newRemaining newReceivedParts newTotalParts -- drop errors and invalid messages + Right Request{} -> pure () -- expecting a response, not a request Left _ -> recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts From b23201a49c1915ec573a443040cbd30e977f3cda Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Tue, 25 Aug 2020 12:51:33 +0200 Subject: [PATCH 14/18] Make key lookups fail after request exhaustion instead of providing default Returning the own node as a default does not make sense in all contexts: Especially for bootstrap joining this can be harmful, so signalling instead that the lookup failed makes distinguishing on a case by case basis possible. Also contributes to #57 --- src/Hash2Pub/DHTProtocol.hs | 17 +++++---- src/Hash2Pub/FediChord.hs | 72 ++++++++++++++++++++++--------------- 2 files changed, 53 insertions(+), 36 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index fac5a3f..1cce94d 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -49,6 +49,8 @@ import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TVar import Control.Exception import Control.Monad (foldM, forM, forM_, void, when) +import Control.Monad.IO.Class (MonadIO(..)) +import Control.Monad.Except (MonadError(..), runExceptT) import qualified Data.ByteString as BS import Data.Either (rights) import Data.Foldable (foldl', foldr') @@ -533,9 +535,10 @@ requestJoin toJoinOn ownStateSTM = do -- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID. -requestQueryID :: LocalNodeState s -- ^ NodeState of the querying node +requestQueryID :: (MonadIO m, MonadError String m) + => LocalNodeState s -- ^ NodeState of the querying node -> NodeID -- ^ target key ID to look up - -> IO RemoteNodeState -- ^ the node responsible for handling that key + -> m RemoteNodeState -- ^ the node responsible for handling that key -- 1. do a local lookup for the l closest nodes -- 2. create l sockets -- 3. send a message async concurrently to all l nodes @@ -543,23 +546,23 @@ requestQueryID :: LocalNodeState s -- ^ NodeState of the querying node -- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results) -- TODO: deal with lookup failures requestQueryID ns targetID = do - firstCacheSnapshot <- readTVarIO . nodeCacheSTM $ ns + firstCacheSnapshot <- liftIO . readTVarIO . nodeCacheSTM $ ns -- TODO: make maxAttempts configurable queryIdLookupLoop firstCacheSnapshot ns 50 targetID -- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining -queryIdLookupLoop :: NodeCache -> LocalNodeState s -> Int -> NodeID -> IO RemoteNodeState +queryIdLookupLoop :: (MonadIO m, MonadError String m) => NodeCache -> LocalNodeState s -> Int -> NodeID -> m RemoteNodeState -- return node itself as default fallback value against infinite recursion. -- TODO: consider using an Either instead of a default value -queryIdLookupLoop _ ns 0 _ = pure $ toRemoteNodeState ns +queryIdLookupLoop _ ns 0 _ = throwError "exhausted maximum lookup attempts" queryIdLookupLoop cacheSnapshot ns maxAttempts targetID = do let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID -- FOUND can only be returned if targetID is owned by local node case localResult of FOUND thisNode -> pure thisNode FORWARD nodeSet -> do - responseEntries <- sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet) - now <- getPOSIXTime + responseEntries <- liftIO $ sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet) + now <- liftIO getPOSIXTime -- check for a FOUND and return it case responseEntries of FOUND foundNode -> pure foundNode diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 54c5e9a..15cee10 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -166,6 +166,7 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do currentlyResponsible <- liftEither lookupResp liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) -- 2. then send a join to the currently responsible node + liftIO $ putStrLn "send a bootstrap Join" joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM liftEither joinResult @@ -244,26 +245,24 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset ) initCache resp - currentlyResponsible <- queryIdLookupLoop bootstrapCache ns 50 $ getNid ns - pure $ Right currentlyResponsible + currentlyResponsible <- runExceptT $ queryIdLookupLoop bootstrapCache ns 50 $ getNid ns + pure currentlyResponsible -- | join a node to the DHT using the global node cache -- node's position. -fediChordVserverJoin :: Service s (RealNodeSTM s) +fediChordVserverJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeStateSTM s -- ^ the local 'NodeState' - -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a + -> m (LocalNodeStateSTM s) -- ^ the joined 'NodeState' after a -- successful join, otherwise an error message fediChordVserverJoin nsSTM = do - ns <- readTVarIO nsSTM + ns <- liftIO $ readTVarIO nsSTM -- 1. get routed to the currently responsible node currentlyResponsible <- requestQueryID ns $ getNid ns - putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) + liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) -- 2. then send a join to the currently responsible node - joinResult <- requestJoin currentlyResponsible nsSTM - case joinResult of - Left err -> pure . Left $ "Error joining on " <> err - Right joinedNS -> pure . Right $ joinedNS + joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM + liftEither joinResult fediChordVserverLeave :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeState s -> m () fediChordVserverLeave ns = do @@ -323,7 +322,7 @@ joinOnNewEntriesThread nsSTM = loop pure () -- otherwise try joining FORWARD _ -> do - joinResult <- fediChordVserverJoin nsSTM + joinResult <- runExceptT $ fediChordVserverJoin nsSTM either -- on join failure, sleep and retry -- TODO: make delay configurable @@ -504,18 +503,26 @@ stabiliseThread nsSTM = forever $ do -- try looking up additional neighbours if list too short forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do ns' <- readTVarIO nsSTM - nextEntry <- requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') - atomically $ do - latestNs <- readTVar nsSTM - writeTVar nsSTM $ addPredecessors [nextEntry] latestNs + nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') + either + (const $ pure ()) + (\entry -> atomically $ do + latestNs <- readTVar nsSTM + writeTVar nsSTM $ addPredecessors [entry] latestNs + ) + nextEntry ) forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do ns' <- readTVarIO nsSTM - nextEntry <- requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') - atomically $ do - latestNs <- readTVar nsSTM - writeTVar nsSTM $ addSuccessors [nextEntry] latestNs + nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') + either + (const $ pure ()) + (\entry -> atomically $ do + latestNs <- readTVar nsSTM + writeTVar nsSTM $ addSuccessors [entry] latestNs + ) + nextEntry ) newNs <- readTVarIO nsSTM @@ -638,7 +645,7 @@ requestMapPurge :: MVar RequestMap -> IO () requestMapPurge mapVar = forever $ do rMapState <- takeMVar mapVar now <- getPOSIXTime - putMVar mapVar $ Map.filter (\entry@(RequestMapEntry _ _ ts) -> + putMVar mapVar $ Map.filter (\(RequestMapEntry _ _ ts) -> now - ts < responsePurgeAge ) rMapState threadDelay $ round responsePurgeAge * 2 * 10^6 @@ -757,7 +764,7 @@ getKeyResponsibility nodeSTM lookupKey = do -- new entry. -- If no vserver is active in the DHT, 'Nothing' is returned. updateLookupCache :: RealNodeSTM s -> NodeID -> IO (Maybe (String, PortNumber)) -updateLookupCache nodeSTM lookupKey = do +updateLookupCache nodeSTM keyToLookup = do (node, lookupSource) <- atomically $ do node <- readTVar nodeSTM let firstVs = headMay (vservers node) @@ -767,18 +774,25 @@ updateLookupCache nodeSTM lookupKey = do pure (node, lookupSource) maybe (do -- if no local node available, delete cache entry and return Nothing - atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete lookupKey + atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete keyToLookup pure Nothing ) (\n -> do -- start a lookup from the node, update the cache with the lookup result and return it - newResponsible <- requestQueryID n lookupKey - let newEntry = (getDomain newResponsible, getServicePort newResponsible) - now <- getPOSIXTime - -- atomic update against lost updates - atomically $ modifyTVar' (lookupCacheSTM node) $ - Map.insert lookupKey (CacheEntry False newEntry now) - pure $ Just newEntry + -- TODO: better retry management, because having no vserver joined yet should + -- be treated differently than other reasons for not getting a result. + newResponsible <- runExceptT $ requestQueryID n keyToLookup + either + (const $ pure Nothing) + (\result -> do + let newEntry = (getDomain result, getServicePort result) + now <- getPOSIXTime + -- atomic update against lost updates + atomically $ modifyTVar' (lookupCacheSTM node) $ + Map.insert keyToLookup (CacheEntry False newEntry now) + pure $ Just newEntry + ) + newResponsible ) lookupSource From fc8aa3e330280198d6a6a084f857d68ea3c9a54c Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Tue, 25 Aug 2020 22:01:01 +0200 Subject: [PATCH 15/18] bugfix: properly process QueryID responses so FOUND is conserved fixes dproper discovery of announced responsibility by FOUND --- src/Hash2Pub/DHTProtocol.hs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 1cce94d..bc5d5e3 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -53,7 +53,7 @@ import Control.Monad.IO.Class (MonadIO(..)) import Control.Monad.Except (MonadError(..), runExceptT) import qualified Data.ByteString as BS import Data.Either (rights) -import Data.Foldable (foldl', foldr') +import Data.Foldable (foldl', foldr', foldrM) import Data.Functor.Identity import Data.IP (IPv6, fromHostAddress6, toHostAddress6) @@ -596,8 +596,10 @@ sendQueryIdMessages targetID ns lParam targets = do -- insert new cache entries both into global cache as well as return accumulated QueryResponses for further processing now <- getPOSIXTime -- collect cache entries from all responses - foldM (\acc resp -> do - let entrySet = case queryResult <$> payload resp of + foldrM (\resp acc -> do + let + responseResult = queryResult <$> payload resp + entrySet = case responseResult of Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now) Just (FORWARD resultset) -> resultset _ -> Set.empty @@ -607,10 +609,15 @@ sendQueryIdMessages targetID ns lParam targets = do -- return accumulated QueryResult pure $ case acc of -- once a FOUND as been encountered, return this as a result - isFound@FOUND{} -> isFound - FORWARD accSet -> FORWARD $ entrySet `Set.union` accSet + FOUND{} -> acc + FORWARD accSet + | maybe False isFound responseResult -> fromJust responseResult + | otherwise -> FORWARD $ entrySet `Set.union` accSet ) (FORWARD Set.empty) responses + where + isFound FOUND{} = True + isFound _ = False -- | Create a QueryID message to be supplied to 'sendRequestTo' lookupMessage :: Integral i From f1b15d5a9e82c944092b65f5330e4835027539bf Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Wed, 26 Aug 2020 17:43:32 +0200 Subject: [PATCH 16/18] bugfix: fix join by adding join node and waiting for it - additionally to adding neighbours of join node, add the join node itself as a neighbour as well - wait for migrations from the node --- src/Hash2Pub/DHTProtocol.hs | 15 +++++++-------- src/Hash2Pub/PostService.hs | 9 +-------- 2 files changed, 8 insertions(+), 16 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index bc5d5e3..61fe7cd 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -516,20 +516,19 @@ requestJoin toJoinOn ownStateSTM = do ([], Set.empty, Set.empty) responses -- sort, slice and set the accumulated successors and predecessors - newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap + -- the contacted node itself is a successor as well and, with few + -- nodes, can be a predecessor as well + newState = setSuccessors (toRemoteNodeState toJoinOn:Set.elems succAccSet) . setPredecessors (toRemoteNodeState toJoinOn:Set.elems predAccSet) $ stateSnap writeTVar ownStateSTM newState pure (cacheInsertQ, newState) -- execute the cache insertions mapM_ (\f -> f joinedState) cacheInsertQ if responses == Set.empty then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) - else if null (predecessors joinedState) && null (successors joinedState) - then pure $ Left "join error: no predecessors or successors" - -- successful join - else do - -- wait for migration data to be completely received - waitForMigrationFrom (nodeService prn) (getNid ownState) - pure $ Right ownStateSTM + else do + -- wait for migration data to be completely received + waitForMigrationFrom (nodeService prn) (getNid toJoinOn) + pure $ Right ownStateSTM ) `catch` (\e -> pure . Left $ displayException (e :: IOException)) diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index fe013a0..92f784a 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -10,16 +10,10 @@ module Hash2Pub.PostService where import Control.Concurrent import Control.Concurrent.Async -import Control.Concurrent.MVar import Control.Concurrent.STM -import Control.Concurrent.STM.TChan -import Control.Concurrent.STM.TChan -import Control.Concurrent.STM.TQueue -import Control.Concurrent.STM.TVar import Control.Exception (Exception (..), try) import Control.Monad (foldM, forM, forM_, forever, when, void) import Control.Monad.IO.Class (liftIO) -import Control.Monad.STM import Data.Bifunctor import qualified Data.ByteString.Lazy.UTF8 as BSUL import qualified Data.ByteString.UTF8 as BSU @@ -40,7 +34,6 @@ import Text.Read (readEither) import qualified Network.Wai.Handler.Warp as Warp import Servant import Servant.Client -import Servant.Server import Hash2Pub.FediChordTypes import Hash2Pub.RingMap @@ -355,7 +348,7 @@ clientDeliverSubscriptions :: PostService d -> (String, Int) -- ^ hostname and port of instance to deliver to -> IO (Either String ()) -- Either signals success or failure clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do - -- collect tag intearval + -- collect tag interval intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv) -- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ] -- extract subscribers and posts From ab9d593a1bcf91a1d6626c18ab938bcef80bb986 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Thu, 27 Aug 2020 00:25:02 +0200 Subject: [PATCH 17/18] bugfix: fix wrong partial Response sender access - replaces improper record field access of `sender`, only existing in a Request, by `senderID` of a Response - fixes the resulting exception-crash - adds new function that enqueues a verification mark and timestamp bump of an existing cache entry --- Hash2Pub.cabal | 2 +- src/Hash2Pub/DHTProtocol.hs | 15 +++++++++++++-- 2 files changed, 14 insertions(+), 3 deletions(-) diff --git a/Hash2Pub.cabal b/Hash2Pub.cabal index 2cc2d84..2d195e3 100644 --- a/Hash2Pub.cabal +++ b/Hash2Pub.cabal @@ -47,7 +47,7 @@ extra-source-files: CHANGELOG.md common deps build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays - ghc-options: -Wall + ghc-options: -Wall -Wpartial-fields diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 61fe7cd..45af727 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -658,8 +658,7 @@ requestStabilise ns neighbour = do ) ([],[]) respSet -- update successfully responded neighbour in cache - now <- getPOSIXTime - maybe (pure ()) (\p -> queueAddEntries (Identity $ RemoteCacheEntry (sender p) now) ns) $ headMay (Set.elems respSet) + maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet) pure $ if null responsePreds && null responseSuccs then Left "no neighbours returned" else Right (responsePreds, responseSuccs) @@ -826,6 +825,18 @@ queueDeleteEntry :: NodeID -> IO () queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete + +-- | enqueue the timestamp update and verification marking of an entry in the +-- global 'NodeCache'. +queueUpdateVerifieds :: Foldable c + => c NodeID + -> LocalNodeState s + -> IO () +queueUpdateVerifieds nIds ns = do + now <- getPOSIXTime + forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $ + markCacheEntryAsVerified (Just now) nid' + -- | retry an IO action at most *i* times until it delivers a result attempts :: Int -- ^ number of retries *i* -> IO (Maybe a) -- ^ action to retry From 1a962f1500e3af4acafb167023f640aa34eae728 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Thu, 27 Aug 2020 00:33:19 +0200 Subject: [PATCH 18/18] stylish run --- app/Experiment.hs | 18 +++++++++--------- src/Hash2Pub/DHTProtocol.hs | 12 ++++++------ src/Hash2Pub/FediChord.hs | 4 ++-- src/Hash2Pub/PostService.hs | 34 +++++++++++++++++----------------- 4 files changed, 34 insertions(+), 34 deletions(-) diff --git a/app/Experiment.hs b/app/Experiment.hs index 51b8e88..deb4cae 100644 --- a/app/Experiment.hs +++ b/app/Experiment.hs @@ -1,16 +1,16 @@ -{-# LANGUAGE OverloadedStrings #-} +{-# LANGUAGE OverloadedStrings #-} module Main where -import System.Random -import Control.Concurrent -import Control.Monad (forM_) -import Control.Monad.State.Class -import Control.Monad.State.Strict (evalStateT) -import Control.Monad.IO.Class -import qualified Network.HTTP.Client as HTTP +import Control.Concurrent +import Control.Monad (forM_) +import Control.Monad.IO.Class +import Control.Monad.State.Class +import Control.Monad.State.Strict (evalStateT) +import qualified Network.HTTP.Client as HTTP +import System.Random -import Hash2Pub.PostService (clientPublishPost, Hashtag) +import Hash2Pub.PostService (Hashtag, clientPublishPost) -- placeholder post data definition diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 45af727..fa5a54a 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -49,8 +49,8 @@ import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TVar import Control.Exception import Control.Monad (foldM, forM, forM_, void, when) -import Control.Monad.IO.Class (MonadIO(..)) -import Control.Monad.Except (MonadError(..), runExceptT) +import Control.Monad.Except (MonadError (..), runExceptT) +import Control.Monad.IO.Class (MonadIO (..)) import qualified Data.ByteString as BS import Data.Either (rights) import Data.Foldable (foldl', foldr', foldrM) @@ -516,7 +516,7 @@ requestJoin toJoinOn ownStateSTM = do ([], Set.empty, Set.empty) responses -- sort, slice and set the accumulated successors and predecessors - -- the contacted node itself is a successor as well and, with few + -- the contacted node itself is a successor as well and, with few -- nodes, can be a predecessor as well newState = setSuccessors (toRemoteNodeState toJoinOn:Set.elems succAccSet) . setPredecessors (toRemoteNodeState toJoinOn:Set.elems predAccSet) $ stateSnap writeTVar ownStateSTM newState @@ -596,7 +596,7 @@ sendQueryIdMessages targetID ns lParam targets = do now <- getPOSIXTime -- collect cache entries from all responses foldrM (\resp acc -> do - let + let responseResult = queryResult <$> payload resp entrySet = case responseResult of Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now) @@ -609,14 +609,14 @@ sendQueryIdMessages targetID ns lParam targets = do pure $ case acc of -- once a FOUND as been encountered, return this as a result FOUND{} -> acc - FORWARD accSet + FORWARD accSet | maybe False isFound responseResult -> fromJust responseResult | otherwise -> FORWARD $ entrySet `Set.union` accSet ) (FORWARD Set.empty) responses where isFound FOUND{} = True - isFound _ = False + isFound _ = False -- | Create a QueryID message to be supplied to 'sendRequestTo' lookupMessage :: Integral i diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 15cee10..45d0bf9 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -504,7 +504,7 @@ stabiliseThread nsSTM = forever $ do forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do ns' <- readTVarIO nsSTM nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') - either + either (const $ pure ()) (\entry -> atomically $ do latestNs <- readTVar nsSTM @@ -782,7 +782,7 @@ updateLookupCache nodeSTM keyToLookup = do -- TODO: better retry management, because having no vserver joined yet should -- be treated differently than other reasons for not getting a result. newResponsible <- runExceptT $ requestQueryID n keyToLookup - either + either (const $ pure Nothing) (\result -> do let newEntry = (getDomain result, getServicePort result) diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 92f784a..81cf552 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -11,27 +11,27 @@ module Hash2Pub.PostService where import Control.Concurrent import Control.Concurrent.Async import Control.Concurrent.STM -import Control.Exception (Exception (..), try) -import Control.Monad (foldM, forM, forM_, forever, when, void) -import Control.Monad.IO.Class (liftIO) +import Control.Exception (Exception (..), try) +import Control.Monad (foldM, forM, forM_, forever, void, + when) +import Control.Monad.IO.Class (liftIO) import Data.Bifunctor -import qualified Data.ByteString.Lazy.UTF8 as BSUL -import qualified Data.ByteString.UTF8 as BSU -import qualified Data.HashMap.Strict as HMap -import qualified Data.HashSet as HSet -import Data.Maybe (fromMaybe, isJust) -import Data.String (fromString) -import qualified Data.Text.Lazy as Txt -import Data.Text.Normalize (NormalizationMode (NFC), - normalize) +import qualified Data.ByteString.Lazy.UTF8 as BSUL +import qualified Data.ByteString.UTF8 as BSU +import qualified Data.HashMap.Strict as HMap +import qualified Data.HashSet as HSet +import Data.Maybe (fromMaybe, isJust) +import Data.String (fromString) +import qualified Data.Text.Lazy as Txt +import Data.Text.Normalize (NormalizationMode (NFC), normalize) import Data.Time.Clock.POSIX -import Data.Typeable (Typeable) -import qualified Network.HTTP.Client as HTTP -import qualified Network.HTTP.Types as HTTPT +import Data.Typeable (Typeable) +import qualified Network.HTTP.Client as HTTP +import qualified Network.HTTP.Types as HTTPT import System.Random -import Text.Read (readEither) +import Text.Read (readEither) -import qualified Network.Wai.Handler.Warp as Warp +import qualified Network.Wai.Handler.Warp as Warp import Servant import Servant.Client