From 5f7ca23f71f9aa1b9e3f1b50634008d7e4d50e01 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Fri, 14 Aug 2020 22:59:14 +0200 Subject: [PATCH 01/12] add missing leave request sending function --- src/Hash2Pub/DHTProtocol.hs | 38 +++++++++++++++++++++++++++++++++---- 1 file changed, 34 insertions(+), 4 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index a071132..ca87295 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -338,15 +338,15 @@ respondLeave nsSTM msgSet = do ) ([],[]) msgSet aRequestPart = Set.elemAt 0 msgSet - senderID = getNid . sender $ aRequestPart + leaveSenderID = getNid . sender $ aRequestPart responseMsg <- atomically $ do nsSnap <- readTVar nsSTM -- remove leaving node from successors, predecessors and NodeCache - writeTQueue (cacheWriteQueue nsSnap) $ deleteCacheEntry senderID + writeTQueue (cacheWriteQueue nsSnap) $ deleteCacheEntry leaveSenderID writeTVar nsSTM $ -- add predecessors and successors of leaving node to own lists - setPredecessors (filter ((/=) senderID . getNid) $ requestPreds <> predecessors nsSnap) - . setSuccessors (filter ((/=) senderID . getNid) $ requestSuccs <> successors nsSnap) $ nsSnap + setPredecessors (filter ((/=) leaveSenderID . getNid) $ requestPreds <> predecessors nsSnap) + . setSuccessors (filter ((/=) leaveSenderID . getNid) $ requestSuccs <> successors nsSnap) $ nsSnap -- TODO: handle handover of key data let leaveResponse = Response { requestID = requestID aRequestPart @@ -625,6 +625,36 @@ requestStabilise ns neighbour = do ) responses +-- | Send a Leave request to the specified node. +-- Service data transfer needs to be done separately, as not all neighbours +-- that need to know about the leaving handle the new service data. +requestLeave :: LocalNodeState s + -> RemoteNodeState -- target node + -> IO (Either String ()) -- error or success +requestLeave ns target = do + srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) + let leavePayload = LeaveRequestPayload { + leaveSuccessors = successors ns + , leavePredecessors = predecessors ns + } + responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo 5000 3 (\rid -> + Request { + requestID = rid + , sender = toRemoteNodeState ns + , part = 1 + , isFinalPart = False + , action = Leave + , payload = Just leavePayload + } + ) + ) `catch` (\e -> pure . Left $ displayException (e :: IOException)) + either + -- forward IO error messages + (pure . Left) + -- empty payload, so no processing required + (const . pure . Right $ ()) + responses + requestPing :: LocalNodeState s -- ^ sending node -> RemoteNodeState -- ^ node to be PINGed -> IO (Either String [RemoteNodeState]) -- ^ all active vServers of the pinged node From 8db8907163a2771d6659d731bb1c08d134ef9ba2 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sat, 15 Aug 2020 17:19:53 +0200 Subject: [PATCH 02/12] filter out spoofed requests for important operations like Join, Leave, Stabilise --- src/Hash2Pub/DHTProtocol.hs | 26 +++++++++++++++++++++++--- 1 file changed, 23 insertions(+), 3 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index ca87295..9f9d86d 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -262,6 +262,7 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do case headMay . Set.elems $ msgSet of Nothing -> pure () Just aPart -> do + let (SockAddrInet6 _ _ sourceIP _) = sourceAddr queueAddEntries (Identity $ RemoteCacheEntry (sender aPart) now) ns -- distinguish on whether and how to respond. If responding, pass message to response generating function and write responses to send queue maybe (pure ()) ( @@ -269,17 +270,36 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do ) =<< (case action aPart of Ping -> Just <$> respondPing nsSTM msgSet - Join -> Just <$> respondJoin nsSTM msgSet + Join -> dropSpoofedIDs sourceIP nsSTM msgSet respondJoin -- ToDo: figure out what happens if not joined QueryID -> Just <$> respondQueryID nsSTM msgSet -- only when joined - Leave -> if isJoined ns then Just <$> respondLeave nsSTM msgSet else pure Nothing - Stabilise -> if isJoined ns then Just <$> respondStabilise nsSTM msgSet else pure Nothing + Leave -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondLeave else pure Nothing + Stabilise -> if isJoined ns then dropSpoofedIDs sourceIP nsSTM msgSet respondStabilise else pure Nothing ) -- for single part request, response starts with part number 1. For multipart requests, response starts with part number n+1. -- TODO: determine request type only from first part, but catch RecSelError on each record access when folding, because otherwise different request type parts can make this crash -- TODO: test case: mixed message types of parts + where + -- | Filter out requests with spoofed node IDs by recomputing the ID using + -- the sender IP. + -- For valid (non-spoofed) sender IDs, the passed responder function is invoked. + dropSpoofedIDs :: HostAddress6 -- msg source address + -> LocalNodeStateSTM s + -> Set.Set FediChordMessage -- message parts of the request + -> (LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)) -- reponder function to be invoked for valid requests + -> IO (Maybe (Map.Map Integer BS.ByteString)) + dropSpoofedIDs addr nsSTM' msgSet' responder = + let + aRequestPart = Set.elemAt 0 msgSet + senderNs = sender aRequestPart + givenSenderID = getNid senderNs + recomputedID = genNodeID addr (getDomain senderNs) (fromInteger $ getVServerID senderNs) + in + if recomputedID == givenSenderID + then Just <$> responder nsSTM' msgSet' + else pure Nothing -- ....... response sending ....... From d2e4359a21a17f3c65864ee7cfd260663045c376 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sat, 15 Aug 2020 17:37:06 +0200 Subject: [PATCH 03/12] rename join function to clarify it just joining a single vserver --- src/Hash2Pub/DHTProtocol.hs | 2 +- src/Hash2Pub/FediChord.hs | 22 ++++++++++------------ 2 files changed, 11 insertions(+), 13 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 9f9d86d..7ed5ec7 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -291,7 +291,7 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do -> (LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)) -- reponder function to be invoked for valid requests -> IO (Maybe (Map.Map Integer BS.ByteString)) dropSpoofedIDs addr nsSTM' msgSet' responder = - let + let aRequestPart = Set.elemAt 0 msgSet senderNs = sender aRequestPart givenSenderID = getNid senderNs diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 7911f3c..dbca8a5 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -1,9 +1,8 @@ -{-# LANGUAGE DataKinds #-} -{-# LANGUAGE DerivingStrategies #-} -{-# LANGUAGE FlexibleContexts #-} -{-# LANGUAGE FlexibleInstances #-} -{-# LANGUAGE OverloadedStrings #-} -{-# LANGUAGE TypeSynonymInstances #-} +{-# LANGUAGE DataKinds #-} +{-# LANGUAGE DerivingStrategies #-} +{-# LANGUAGE FlexibleContexts #-} +{-# LANGUAGE FlexibleInstances #-} +{-# LANGUAGE OverloadedStrings #-} {- | Module : FediChord Description : An opinionated implementation of the EpiChord DHT by Leong et al. @@ -40,7 +39,7 @@ module Hash2Pub.FediChord ( , bsAsIpAddr , FediChordConf(..) , fediChordInit - , fediChordJoin + , fediChordVserverJoin , fediChordBootstrapJoin , tryBootstrapJoining , fediMainThreads @@ -250,10 +249,10 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do -- | join a node to the DHT using the global node cache -- node's position. -fediChordJoin :: LocalNodeStateSTM s -- ^ the local 'NodeState' +fediChordVserverJoin :: LocalNodeStateSTM s -- ^ the local 'NodeState' -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a -- successful join, otherwise an error message -fediChordJoin nsSTM = do +fediChordVserverJoin nsSTM = do ns <- readTVarIO nsSTM -- 1. get routed to the currently responsible node currentlyResponsible <- requestQueryID ns $ getNid ns @@ -284,14 +283,13 @@ joinOnNewEntriesThread nsSTM = loop pure () -- otherwise try joining FORWARD _ -> do - joinResult <- fediChordJoin nsSTM + joinResult <- fediChordVserverJoin nsSTM either -- on join failure, sleep and retry -- TODO: make delay configurable (const $ threadDelay (30 * 10^6) >> loop) (const $ pure ()) joinResult - emptyset = Set.empty -- because pattern matches don't accept qualified names -- | cache updater thread that waits for incoming NodeCache update instructions on @@ -485,7 +483,7 @@ stabiliseThread nsSTM = forever $ do threadDelay (60 * 10^6) where -- | send a stabilise request to the n-th neighbour - -- (specified by the provided getter function) and on failure retr + -- (specified by the provided getter function) and on failure retry -- with the n+1-th neighbour. -- On success, return 2 lists: The failed nodes and the potential neighbours -- returned by the queried node. From 4302452d18dd6fd88d472f2d8f1293e9d4774235 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sat, 15 Aug 2020 22:55:19 +0200 Subject: [PATCH 04/12] implement vserver leave and trigger data transfer initiation still unused though contributes to #36 --- src/Hash2Pub/DHTProtocol.hs | 1 + src/Hash2Pub/FediChord.hs | 35 ++++++++++++++++++++++++++++++++++ src/Hash2Pub/FediChordTypes.hs | 6 ++++++ src/Hash2Pub/PostService.hs | 10 ++++++---- 4 files changed, 48 insertions(+), 4 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 7ed5ec7..8930edc 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -19,6 +19,7 @@ module Hash2Pub.DHTProtocol , sendQueryIdMessages , requestQueryID , requestJoin + , requestLeave , requestPing , requestStabilise , lookupMessage diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index dbca8a5..c55d94c 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -263,6 +263,41 @@ fediChordVserverJoin nsSTM = do Left err -> pure . Left $ "Error joining on " <> err Right joinedNS -> pure . Right $ joinedNS +fediChordVserverLeave :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeState s -> m () +fediChordVserverLeave ns = do + -- TODO: deal with failure of all successors, e.g. by invoking a stabilise + -- and looking up further successors. So far we just fail here. + _ <- migrateSuccessor + -- then send leave messages to all other neighbours + -- TODO: distinguish between sending error causes on our side and on the + -- network/ target side. The latter cannot be fixed anyways while the + -- former could be worked around + + -- send a leave message to all neighbours + forM_ (predecessors ns <> successors ns) $ liftIO . requestLeave ns + where + sendUntilSuccess i = maybe + (pure $ Left "Exhausted all successors") + (\neighb -> do + leaveResponse <- requestLeave ns neighb + case leaveResponse of + Left _ -> sendUntilSuccess (i+1) + -- return first successfully contacted neighbour, + -- so it can be contacted by the service layer for migration + Right _ -> pure $ Right neighb + ) + $ atMay (successors ns) i + migrateSuccessor :: (MonadError String m, MonadIO m) => m () + migrateSuccessor = do + -- send leave message to first responding successor + successorLeave <- liftIO $ sendUntilSuccess 0 + -- trigger service data transfer for abandoned key space + migrateToNode <- liftEither successorLeave + ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode ns) + migrationResult <- liftIO $ migrateData ownService (getNid ns) (getNid migrateToNode) (getDomain migrateToNode, fromIntegral $ getServicePort migrateToNode) + liftEither migrationResult + + -- | Wait for new cache entries to appear and then try joining on them. -- Exits after successful joining. diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index 20d65fe..214ece2 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -424,6 +424,12 @@ class Service s d where -- | run the service runService :: ServiceConf -> d -> IO (s d) getListeningPortFromService :: (Integral i) => s d -> i + -- | trigger a service data migration of data between the two given keys + migrateData :: s d + -> NodeID -- ^ start key + -> NodeID -- ^ end key + -> (String, Int) -- ^ hostname and port of target service + -> IO (Either String ()) -- ^ success or failure instance Hashable.Hashable NodeID where hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 797a9e6..71998df 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -115,6 +115,8 @@ instance DHT d => Service PostService d where getListeningPortFromService = fromIntegral . confServicePort . serviceConf + migrateData = clientDeliverSubscriptions + -- | return a WAI application postServiceApplication :: DHT d => PostService d -> Application @@ -320,13 +322,13 @@ relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postM -- and their outstanding delivery queue to another instance. -- If the transfer succeeds, the transfered subscribers are removed from the local list. clientDeliverSubscriptions :: PostService d - -> Hashtag -- ^ fromTag - -> Hashtag -- ^ toTag + -> NodeID -- ^ fromTag + -> NodeID -- ^ toTag -> (String, Int) -- ^ hostname and port of instance to deliver to -> IO (Either String ()) -- Either signals success or failure -clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do +clientDeliverSubscriptions serv fromKey toKey (toHost, toPort) = do -- collect tag intearval - intervalTags <- takeRMapSuccessorsFromTo (genKeyID $ Txt.unpack fromTag) (genKeyID $ Txt.unpack toTag) <$> readTVarIO (subscribers serv) + intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv) -- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ] -- extract subscribers and posts -- no need for extracting as a single atomic operation, as newly incoming posts are supposed to be rejected because of already having re-positioned on the DHT From 470ce6f39af71d77b36e68e17c6033b2d73d4654 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sat, 15 Aug 2020 23:58:47 +0200 Subject: [PATCH 05/12] correct the slice of transfered tags at leave --- src/Hash2Pub/FediChord.hs | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index c55d94c..44ea80a 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -293,8 +293,11 @@ fediChordVserverLeave ns = do successorLeave <- liftIO $ sendUntilSuccess 0 -- trigger service data transfer for abandoned key space migrateToNode <- liftEither successorLeave + let lowerKeyBound = maybe (getNid ns) getNid $ headMay (predecessors ns) ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode ns) - migrationResult <- liftIO $ migrateData ownService (getNid ns) (getNid migrateToNode) (getDomain migrateToNode, fromIntegral $ getServicePort migrateToNode) + -- previously held data is the one between the immediate predecessor and + -- the own ID + migrationResult <- liftIO $ migrateData ownService lowerKeyBound (getNid ns) (getDomain migrateToNode, fromIntegral $ getServicePort migrateToNode) liftEither migrationResult From 581757965aa107532c81fb31fb72d568c881a42c Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sun, 16 Aug 2020 17:53:48 +0200 Subject: [PATCH 06/12] trigger service data migration at join --- src/Hash2Pub/DHTProtocol.hs | 28 +++++++++++++++++++--------- src/Hash2Pub/FediChord.hs | 9 +++++---- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 8930edc..f5fcdbd 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -41,13 +41,14 @@ module Hash2Pub.DHTProtocol ) where +import Control.Concurrent import Control.Concurrent.Async import Control.Concurrent.STM import Control.Concurrent.STM.TBQueue import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TVar import Control.Exception -import Control.Monad (foldM, forM, forM_) +import Control.Monad (foldM, forM, forM_, when) import qualified Data.ByteString as BS import Data.Either (rights) import Data.Foldable (foldl', foldr') @@ -75,10 +76,11 @@ import Hash2Pub.FediChordTypes (CacheEntry (..), LocalNodeState (..), LocalNodeStateSTM, NodeCache, NodeID, NodeState (..), - RealNode (..), + RealNode (..), RealNodeSTM, RemoteNodeState (..), RingEntry (..), RingMap (..), - addRMapEntry, addRMapEntryWith, + Service (..), addRMapEntry, + addRMapEntryWith, cacheGetNodeStateUnvalidated, cacheLookup, cacheLookupPred, cacheLookupSucc, genNodeID, @@ -250,7 +252,8 @@ ackRequest _ _ = Map.empty -- | Dispatch incoming requests to the dedicated handling and response function, and enqueue -- the response to be sent. -handleIncomingRequest :: LocalNodeStateSTM s -- ^ the handling node +handleIncomingRequest :: Service s (RealNodeSTM s) + => LocalNodeStateSTM s -- ^ the handling node -> TQueue (BS.ByteString, SockAddr) -- ^ send queue -> Set.Set FediChordMessage -- ^ all parts of the request to handle -> SockAddr -- ^ source address of the request @@ -422,10 +425,10 @@ respondPing nsSTM msgSet = do -- this modifies node state, so locking and IO seems to be necessary. -- Still try to keep as much code as possible pure -respondJoin :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) +respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondJoin nsSTM msgSet = do -- atomically read and modify the node state according to the parsed request - responseMsg <- atomically $ do + (dataMigration, responseMsg) <- atomically $ do nsSnap <- readTVar nsSTM cache <- readTVar $ nodeCacheSTM nsSnap let @@ -455,17 +458,24 @@ respondJoin nsSTM msgSet = do , payload = Just responsePayload } writeTVar nsSTM joinedNS - pure joinResponse + ownService <- nodeService <$> readTVar (parentRealNode nsSnap) + let + serviceDataMigrator = migrateData ownService lowerKeyBound (getNid senderNS) (getDomain senderNS, fromIntegral $ getServicePort senderNS) + lowerKeyBound = maybe (getNid nsSnap) getNid $ headMay (predecessors nsSnap) + pure (Just serviceDataMigrator, joinResponse) -- otherwise respond with empty payload - else pure Response { + else pure (Nothing, Response { requestID = requestID aRequestPart , senderID = getNid nsSnap , part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1 , isFinalPart = False , action = Join , payload = Nothing - } + }) + -- as DHT response is required immediately, fork the service data migration push + -- into a new thread. That's kind of ugly but the best I can think of so far + when (isJust dataMigration) (forkIO (fromJust dataMigration >> pure ()) >> pure ()) pure $ serialiseMessage sendMessageSize responseMsg -- TODO: notify service layer to copy over data now handled by the new joined node diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 44ea80a..f3a482c 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -281,10 +281,10 @@ fediChordVserverLeave ns = do (\neighb -> do leaveResponse <- requestLeave ns neighb case leaveResponse of - Left _ -> sendUntilSuccess (i+1) + Left _ -> sendUntilSuccess (i+1) -- return first successfully contacted neighbour, -- so it can be contacted by the service layer for migration - Right _ -> pure $ Right neighb + Right _ -> pure $ Right neighb ) $ atMay (successors ns) i migrateSuccessor :: (MonadError String m, MonadIO m) => m () @@ -579,7 +579,7 @@ sendThread sock sendQ = forever $ do sendAllTo sock packet addr -- | Sets up and manages the main server threads of FediChord -fediMainThreads :: Socket -> LocalNodeStateSTM s -> IO () +fediMainThreads :: Service s (RealNodeSTM s) => Socket -> LocalNodeStateSTM s -> IO () fediMainThreads sock nsSTM = do ns <- readTVarIO nsSTM putStrLn "launching threads" @@ -622,7 +622,8 @@ requestMapPurge mapVar = forever $ do -- | Wait for messages, deserialise them, manage parts and acknowledgement status, -- and pass them to their specific handling function. -fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue +fediMessageHandler :: Service s (RealNodeSTM s) + => TQueue (BS.ByteString, SockAddr) -- ^ send queue -> TQueue (BS.ByteString, SockAddr) -- ^ receive queue -> LocalNodeStateSTM s -- ^ acting NodeState -> IO () From 414564705a9bb8fb2f1f81938e7f607b4010e78a Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sun, 16 Aug 2020 23:26:18 +0200 Subject: [PATCH 07/12] possibility to wait for a migration to complete --- src/Hash2Pub/DHTProtocol.hs | 2 +- src/Hash2Pub/FediChord.hs | 2 +- src/Hash2Pub/FediChordTypes.hs | 3 ++ src/Hash2Pub/PostService.hs | 50 ++++++++++++++++++++++++---------- 4 files changed, 41 insertions(+), 16 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index f5fcdbd..52ea5ba 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -460,7 +460,7 @@ respondJoin nsSTM msgSet = do writeTVar nsSTM joinedNS ownService <- nodeService <$> readTVar (parentRealNode nsSnap) let - serviceDataMigrator = migrateData ownService lowerKeyBound (getNid senderNS) (getDomain senderNS, fromIntegral $ getServicePort senderNS) + serviceDataMigrator = migrateData ownService (getNid nsSnap) lowerKeyBound (getNid senderNS) (getDomain senderNS, fromIntegral $ getServicePort senderNS) lowerKeyBound = maybe (getNid nsSnap) getNid $ headMay (predecessors nsSnap) pure (Just serviceDataMigrator, joinResponse) -- otherwise respond with empty payload diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index f3a482c..8d25186 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -297,7 +297,7 @@ fediChordVserverLeave ns = do ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode ns) -- previously held data is the one between the immediate predecessor and -- the own ID - migrationResult <- liftIO $ migrateData ownService lowerKeyBound (getNid ns) (getDomain migrateToNode, fromIntegral $ getServicePort migrateToNode) + migrationResult <- liftIO $ migrateData ownService (getNid ns) lowerKeyBound (getNid ns) (getDomain migrateToNode, fromIntegral $ getServicePort migrateToNode) liftEither migrationResult diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index 214ece2..cbd3a58 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -426,10 +426,13 @@ class Service s d where getListeningPortFromService :: (Integral i) => s d -> i -- | trigger a service data migration of data between the two given keys migrateData :: s d + -> NodeID -- ^ source/ sender node ID -> NodeID -- ^ start key -> NodeID -- ^ end key -> (String, Int) -- ^ hostname and port of target service -> IO (Either String ()) -- ^ success or failure + -- | Wait for an incoming migration from a given node to succeed, may block forever + waitForMigrationFrom :: s d -> NodeID -> IO () instance Hashable.Hashable NodeID where hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 71998df..548469e 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -10,6 +10,7 @@ module Hash2Pub.PostService where import Control.Concurrent import Control.Concurrent.Async +import Control.Concurrent.MVar import Control.Concurrent.STM import Control.Concurrent.STM.TChan import Control.Concurrent.STM.TChan @@ -46,20 +47,21 @@ import Hash2Pub.RingMap data PostService d = PostService - { serviceConf :: ServiceConf + { serviceConf :: ServiceConf -- queues, other data structures - , baseDHT :: (DHT d) => d - , serviceThread :: TVar ThreadId - , subscribers :: TVar RelayTags + , baseDHT :: (DHT d) => d + , serviceThread :: TVar ThreadId + , subscribers :: TVar RelayTags -- ^ for each tag store the subscribers + their queue - , ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime) + , ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime) -- ^ tags subscribed by the own node have an assigned lease time - , ownPosts :: TVar (HSet.HashSet Txt.Text) + , ownPosts :: TVar (HSet.HashSet Txt.Text) -- ^ just store the existence of posts for saving memory, - , relayInQueue :: TQueue (Hashtag, PostID, PostContent) + , relayInQueue :: TQueue (Hashtag, PostID, PostContent) -- ^ Queue for processing incoming posts of own instance asynchronously - , postFetchQueue :: TQueue PostID - , httpMan :: HTTP.Manager + , postFetchQueue :: TQueue PostID + , migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ())) + , httpMan :: HTTP.Manager } deriving (Typeable) @@ -86,6 +88,7 @@ instance DHT d => Service PostService d where ownPostVar <- newTVarIO HSet.empty relayInQueue' <- newTQueueIO postFetchQueue' <- newTQueueIO + migrationsInProgress' <- newTVarIO HMap.empty httpMan' <- HTTP.newManager HTTP.defaultManagerSettings let thisService = PostService { @@ -97,6 +100,7 @@ instance DHT d => Service PostService d where , ownPosts = ownPostVar , relayInQueue = relayInQueue' , postFetchQueue = postFetchQueue' + , migrationsInProgress = migrationsInProgress' , httpMan = httpMan' } port' = fromIntegral (confServicePort conf) @@ -117,6 +121,17 @@ instance DHT d => Service PostService d where migrateData = clientDeliverSubscriptions + waitForMigrationFrom serv fromID = do + migrationSynchroniser <- atomically $ do + syncPoint <- HMap.lookup fromID <$> readTVar (migrationsInProgress serv) + maybe + -- decision: this function blocks until it gets an incoming migration from given ID + retry + pure + syncPoint + -- block until migration finished + takeMVar migrationSynchroniser + -- | return a WAI application postServiceApplication :: DHT d => PostService d -> Application @@ -136,7 +151,7 @@ placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent -- delivery endpoint of newly published posts of the relay's instance - :<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text + :<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text -- endpoint for delivering the subscriptions and outstanding queue :<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text -- fetch endpoint for posts, full post ID is http://$domain/post/$postid @@ -194,10 +209,14 @@ newtype UnhandledTagException = UnhandledTagException String instance Exception UnhandledTagException -subscriptionDelivery :: DHT d => PostService d -> Txt.Text -> Handler Txt.Text -subscriptionDelivery serv subList = do +subscriptionDelivery :: DHT d => PostService d -> Integer -> Txt.Text -> Handler Txt.Text +subscriptionDelivery serv senderID subList = do let tagSubs = Txt.lines subList + -- signal that the migration is in progress + syncMVar <- liftIO newEmptyMVar + liftIO . atomically $ modifyTVar' (migrationsInProgress serv) $ + HMap.insert (fromInteger senderID) syncMVar -- In favor of having the convenience of rolling back the transaction once a -- not-handled tag occurs, this results in a single large transaction. -- Hopefully the performance isn't too bad. @@ -211,6 +230,8 @@ subscriptionDelivery serv subList = do `catchSTM` (\e -> pure . Left $ show (e :: UnhandledTagException)) -- TODO: potentially log this :: STM (Either String ())) + -- TODO: should this always signal migration finished to avoid deadlocksP + liftIO $ putMVar syncMVar () case res of Left err -> throwError err410 {errBody = BSUL.fromString err} Right _ -> pure "" @@ -322,11 +343,12 @@ relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postM -- and their outstanding delivery queue to another instance. -- If the transfer succeeds, the transfered subscribers are removed from the local list. clientDeliverSubscriptions :: PostService d + -> NodeID -- ^ sender node ID -> NodeID -- ^ fromTag -> NodeID -- ^ toTag -> (String, Int) -- ^ hostname and port of instance to deliver to -> IO (Either String ()) -- Either signals success or failure -clientDeliverSubscriptions serv fromKey toKey (toHost, toPort) = do +clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do -- collect tag intearval intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv) -- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ] @@ -350,7 +372,7 @@ clientDeliverSubscriptions serv fromKey toKey (toHost, toPort) = do "" intervalTags -- send subscribers - resp <- runClientM (subscriptionDeliveryClient subscriberData) (mkClientEnv (httpMan serv) (BaseUrl Http toHost (fromIntegral toPort) "")) + resp <- runClientM (subscriptionDeliveryClient (getNodeID fromNode) subscriberData) (mkClientEnv (httpMan serv) (BaseUrl Http toHost (fromIntegral toPort) "")) -- on failure return a Left, otherwise delete subscription entry case resp of Left err -> pure . Left . show $ err From c49c1a89c9c5774a810f31c963bb54221901682a Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Mon, 17 Aug 2020 00:22:37 +0200 Subject: [PATCH 08/12] wait for migration to complete on join also clean up migration entry after success --- src/Hash2Pub/DHTProtocol.hs | 13 ++++++++----- src/Hash2Pub/FediChord.hs | 18 ++++++++++-------- src/Hash2Pub/PostService.hs | 6 +++++- 3 files changed, 23 insertions(+), 14 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 52ea5ba..13dd434 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -482,7 +482,7 @@ respondJoin nsSTM msgSet = do -- ....... request sending ....... -- | send a join request and return the joined 'LocalNodeState' including neighbours -requestJoin :: NodeState a => a -- ^ currently responsible node to be contacted +requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ currently responsible node to be contacted -> LocalNodeStateSTM s -- ^ joining NodeState -> IO (Either String (LocalNodeStateSTM s)) -- ^ node after join with all its new information requestJoin toJoinOn ownStateSTM = do @@ -521,12 +521,15 @@ requestJoin toJoinOn ownStateSTM = do pure (cacheInsertQ, newState) -- execute the cache insertions mapM_ (\f -> f joinedState) cacheInsertQ - pure $ if responses == Set.empty - then Left $ "join error: got no response from " <> show (getNid toJoinOn) + if responses == Set.empty + then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) else if null (predecessors joinedState) && null (successors joinedState) - then Left "join error: no predecessors or successors" + then pure $ Left "join error: no predecessors or successors" -- successful join - else Right ownStateSTM + else do + -- wait for migration data to be completely received + waitForMigrationFrom (nodeService prn) (getNid ownState) + pure $ Right ownStateSTM ) `catch` (\e -> pure . Left $ displayException (e :: IOException)) diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 8d25186..f544061 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -152,9 +152,10 @@ nodeStateInit realNodeSTM = do -- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed -- for resolving the new node's position. -fediChordBootstrapJoin :: LocalNodeStateSTM s -- ^ the local 'NodeState' - -> (String, PortNumber) -- ^ domain and port of a bootstrapping node - -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a +fediChordBootstrapJoin :: Service s (RealNodeSTM s) + => LocalNodeStateSTM s -- ^ the local 'NodeState' + -> (String, PortNumber) -- ^ domain and port of a bootstrapping node + -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a -- successful join, otherwise an error message fediChordBootstrapJoin nsSTM bootstrapNode = do -- can be invoked multiple times with all known bootstrapping nodes until successfully joined @@ -170,7 +171,7 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do -- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters. -- Unjoined try joining instead. -convergenceSampleThread :: LocalNodeStateSTM s -> IO () +convergenceSampleThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO () convergenceSampleThread nsSTM = forever $ do nsSnap <- readTVarIO nsSTM parentNode <- readTVarIO $ parentRealNode nsSnap @@ -201,7 +202,7 @@ convergenceSampleThread nsSTM = forever $ do -- | Try joining the DHT through any of the bootstrapping nodes until it succeeds. -tryBootstrapJoining :: LocalNodeStateSTM s -> IO (Either String (LocalNodeStateSTM s)) +tryBootstrapJoining :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO (Either String (LocalNodeStateSTM s)) tryBootstrapJoining nsSTM = do bss <- atomically $ do nsSnap <- readTVar nsSTM @@ -249,8 +250,9 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do -- | join a node to the DHT using the global node cache -- node's position. -fediChordVserverJoin :: LocalNodeStateSTM s -- ^ the local 'NodeState' - -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a +fediChordVserverJoin :: Service s (RealNodeSTM s) + => LocalNodeStateSTM s -- ^ the local 'NodeState' + -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a -- successful join, otherwise an error message fediChordVserverJoin nsSTM = do ns <- readTVarIO nsSTM @@ -304,7 +306,7 @@ fediChordVserverLeave ns = do -- | Wait for new cache entries to appear and then try joining on them. -- Exits after successful joining. -joinOnNewEntriesThread :: LocalNodeStateSTM s -> IO () +joinOnNewEntriesThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO () joinOnNewEntriesThread nsSTM = loop where loop = do diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 548469e..c277327 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -231,7 +231,11 @@ subscriptionDelivery serv senderID subList = do -- TODO: potentially log this :: STM (Either String ())) -- TODO: should this always signal migration finished to avoid deadlocksP - liftIO $ putMVar syncMVar () + liftIO $ putMVar syncMVar () -- wakes up waiting thread + liftIO $ putMVar syncMVar () -- blocks until waiting thread has resumed + -- delete this migration from ongoing ones + liftIO . atomically $ modifyTVar' (migrationsInProgress serv) $ + HMap.delete (fromInteger senderID) case res of Left err -> throwError err410 {errBody = BSUL.fromString err} Right _ -> pure "" From b8cedada4892a6abbea5b31e2599e80d26220118 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Mon, 17 Aug 2020 11:37:04 +0200 Subject: [PATCH 09/12] prevent threads not awaiting migration from blocking their response --- src/Hash2Pub/PostService.hs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index c277327..a871343 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -232,10 +232,12 @@ subscriptionDelivery serv senderID subList = do :: STM (Either String ())) -- TODO: should this always signal migration finished to avoid deadlocksP liftIO $ putMVar syncMVar () -- wakes up waiting thread - liftIO $ putMVar syncMVar () -- blocks until waiting thread has resumed - -- delete this migration from ongoing ones - liftIO . atomically $ modifyTVar' (migrationsInProgress serv) $ - HMap.delete (fromInteger senderID) + -- allow response to be completed independently from waiting thread + _ <- liftIO . forkIO $ do + putMVar syncMVar () -- blocks until waiting thread has resumed + -- delete this migration from ongoing ones + liftIO . atomically $ modifyTVar' (migrationsInProgress serv) $ + HMap.delete (fromInteger senderID) case res of Left err -> throwError err410 {errBody = BSUL.fromString err} Right _ -> pure "" From 6982a0b245ebcae0ab4b9c4f8b7cad5b0061a254 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Mon, 17 Aug 2020 12:34:47 +0200 Subject: [PATCH 10/12] indicate in LeaveRequest whether to expect a migration this information is used to decide whether to await an incoming migration in `respondLeave` --- FediChord.asn1 | 4 ++-- src/Hash2Pub/ASN1Coding.hs | 11 ++++++----- src/Hash2Pub/DHTProtocol.hs | 14 +++++++++----- src/Hash2Pub/FediChord.hs | 4 ++-- src/Hash2Pub/ProtocolTypes.hs | 1 + 5 files changed, 20 insertions(+), 14 deletions(-) diff --git a/FediChord.asn1 b/FediChord.asn1 index f278f8f..79b894a 100644 --- a/FediChord.asn1 +++ b/FediChord.asn1 @@ -89,8 +89,8 @@ StabiliseResponsePayload ::= SEQUENCE { LeaveRequestPayload ::= SEQUENCE { successors SEQUENCE OF NodeState, - predecessors SEQUENCE OF NodeState - -- ToDo: transfer of own data to newly responsible node + predecessors SEQUENCE OF NodeState, + doMigration BOOLEAN } LeaveResponsePayload ::= NULL -- just a confirmation diff --git a/src/Hash2Pub/ASN1Coding.hs b/src/Hash2Pub/ASN1Coding.hs index 456dac6..10177ab 100644 --- a/src/Hash2Pub/ASN1Coding.hs +++ b/src/Hash2Pub/ASN1Coding.hs @@ -38,6 +38,7 @@ splitPayload numParts pl@LeaveRequestPayload{} = [ LeaveRequestPayload { leaveSuccessors = atDef [] (listInto numParts $ leaveSuccessors pl) (thisPart-1) , leavePredecessors = atDef [] (listInto numParts $ leavePredecessors pl) (thisPart-1) + , leaveDoMigration = leaveDoMigration pl } | thisPart <- [1..numParts] ] splitPayload numParts pl@StabiliseResponsePayload{} = [ StabiliseResponsePayload { @@ -134,9 +135,8 @@ encodePayload payload'@LeaveRequestPayload{} = <> [End Sequence , Start Sequence] <> concatMap encodeNodeState (leavePredecessors payload') - <> [End Sequence - , End Sequence] --- currently StabiliseResponsePayload and LeaveRequestPayload are equal + <> [End Sequence] + <> [Boolean (leaveDoMigration payload'), End Sequence] encodePayload payload'@StabiliseResponsePayload{} = Start Sequence : Start Sequence @@ -144,8 +144,7 @@ encodePayload payload'@StabiliseResponsePayload{} = <> [End Sequence , Start Sequence] <> concatMap encodeNodeState (stabilisePredecessors payload') - <> [End Sequence - , End Sequence] + <> [End Sequence, End Sequence] encodePayload payload'@StabiliseRequestPayload = [Null] encodePayload payload'@QueryIDResponsePayload{} = let @@ -415,9 +414,11 @@ parseLeaveRequest :: ParseASN1 ActionPayload parseLeaveRequest = onNextContainer Sequence $ do succ' <- onNextContainer Sequence (getMany parseNodeState) pred' <- onNextContainer Sequence (getMany parseNodeState) + doMigration <- parseBool pure $ LeaveRequestPayload { leaveSuccessors = succ' , leavePredecessors = pred' + , leaveDoMigration = doMigration } parseLeaveResponse :: ParseASN1 ActionPayload diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 13dd434..972059f 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -48,7 +48,7 @@ import Control.Concurrent.STM.TBQueue import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TVar import Control.Exception -import Control.Monad (foldM, forM, forM_, when) +import Control.Monad (foldM, forM, forM_, void, when) import qualified Data.ByteString as BS import Data.Either (rights) import Data.Foldable (foldl', foldr') @@ -352,8 +352,7 @@ respondQueryID nsSTM msgSet = do -- | Respond to a Leave request by removing the leaving node from local data structures -- and confirming with response. --- TODO: copy over key data from leaver and confirm -respondLeave :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) +respondLeave :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondLeave nsSTM msgSet = do -- combine payload of all parts let (requestPreds, requestSuccs) = foldr' (\msg (predAcc, succAcc) -> @@ -371,7 +370,6 @@ respondLeave nsSTM msgSet = do -- add predecessors and successors of leaving node to own lists setPredecessors (filter ((/=) leaveSenderID . getNid) $ requestPreds <> predecessors nsSnap) . setSuccessors (filter ((/=) leaveSenderID . getNid) $ requestSuccs <> successors nsSnap) $ nsSnap - -- TODO: handle handover of key data let leaveResponse = Response { requestID = requestID aRequestPart , senderID = getNid nsSnap @@ -381,6 +379,10 @@ respondLeave nsSTM msgSet = do , payload = Just LeaveResponsePayload } pure leaveResponse + -- if awaiting an incoming service data migration, collect the lock without blocking this thread + when (maybe False leaveDoMigration (payload aRequestPart)) $ do + ownService <- atomically $ nodeService <$> ((readTVar nsSTM) >>= (readTVar . parentRealNode)) + void (forkIO $ waitForMigrationFrom ownService leaveSenderID) pure $ serialiseMessage sendMessageSize responseMsg -- | respond to stabilise requests by returning successor and predecessor list @@ -663,13 +665,15 @@ requestStabilise ns neighbour = do -- Service data transfer needs to be done separately, as not all neighbours -- that need to know about the leaving handle the new service data. requestLeave :: LocalNodeState s + -> Bool -- whether to migrate service data -> RemoteNodeState -- target node -> IO (Either String ()) -- error or success -requestLeave ns target = do +requestLeave ns doMigration target = do srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) let leavePayload = LeaveRequestPayload { leaveSuccessors = successors ns , leavePredecessors = predecessors ns + , leaveDoMigration = doMigration } responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo 5000 3 (\rid -> Request { diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index f544061..399ddfd 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -276,12 +276,12 @@ fediChordVserverLeave ns = do -- former could be worked around -- send a leave message to all neighbours - forM_ (predecessors ns <> successors ns) $ liftIO . requestLeave ns + forM_ (predecessors ns <> successors ns) $ liftIO . requestLeave ns False where sendUntilSuccess i = maybe (pure $ Left "Exhausted all successors") (\neighb -> do - leaveResponse <- requestLeave ns neighb + leaveResponse <- requestLeave ns True neighb case leaveResponse of Left _ -> sendUntilSuccess (i+1) -- return first successfully contacted neighbour, diff --git a/src/Hash2Pub/ProtocolTypes.hs b/src/Hash2Pub/ProtocolTypes.hs index 37c00e9..86825a7 100644 --- a/src/Hash2Pub/ProtocolTypes.hs +++ b/src/Hash2Pub/ProtocolTypes.hs @@ -55,6 +55,7 @@ data ActionPayload = QueryIDRequestPayload | LeaveRequestPayload { leaveSuccessors :: [RemoteNodeState] , leavePredecessors :: [RemoteNodeState] + , leaveDoMigration :: Bool } | StabiliseRequestPayload | PingRequestPayload From 969f6d7fc204111b7c5653fc4b064bfc489d0ad3 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Mon, 17 Aug 2020 13:39:22 +0200 Subject: [PATCH 11/12] fix tests --- test/FediChordSpec.hs | 1 + 1 file changed, 1 insertion(+) diff --git a/test/FediChordSpec.hs b/test/FediChordSpec.hs index ed1f3c8..6a3ca5d 100644 --- a/test/FediChordSpec.hs +++ b/test/FediChordSpec.hs @@ -189,6 +189,7 @@ spec = do lReqPayload = LeaveRequestPayload { leaveSuccessors = someNodes , leavePredecessors = someNodes + , leaveDoMigration = True } stabReqPayload = StabiliseRequestPayload pingReqPayload = PingRequestPayload From fce5ff9153ba0efc7494ca915626d93e69e83c33 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Tue, 18 Aug 2020 00:17:13 +0200 Subject: [PATCH 12/12] implement service data migration for stabilise --- src/Hash2Pub/DHTProtocol.hs | 5 ++--- src/Hash2Pub/FediChord.hs | 30 ++++++++++++++++++++++++++---- 2 files changed, 28 insertions(+), 7 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 972059f..bd7953f 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -381,7 +381,7 @@ respondLeave nsSTM msgSet = do pure leaveResponse -- if awaiting an incoming service data migration, collect the lock without blocking this thread when (maybe False leaveDoMigration (payload aRequestPart)) $ do - ownService <- atomically $ nodeService <$> ((readTVar nsSTM) >>= (readTVar . parentRealNode)) + ownService <- atomically $ nodeService <$> (readTVar nsSTM >>= (readTVar . parentRealNode)) void (forkIO $ waitForMigrationFrom ownService leaveSenderID) pure $ serialiseMessage sendMessageSize responseMsg @@ -425,8 +425,7 @@ respondPing nsSTM msgSet = do } pure $ serialiseMessage sendMessageSize pingResponse --- this modifies node state, so locking and IO seems to be necessary. --- Still try to keep as much code as possible pure + respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondJoin nsSTM msgSet = do -- atomically read and modify the node state according to the parsed request diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 399ddfd..15563de 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -465,9 +465,9 @@ checkCacheSliceInvariants ns -- | Periodically send @StabiliseRequest' s to the closest neighbour nodes, until -- one responds, and get their neighbours for maintaining the own neighbour lists. -- If necessary, request new neighbours. -stabiliseThread :: LocalNodeStateSTM s -> IO () +stabiliseThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO () stabiliseThread nsSTM = forever $ do - ns <- readTVarIO nsSTM + oldNs <- readTVarIO nsSTM putStrLn "stabilise run: begin" @@ -478,8 +478,8 @@ stabiliseThread nsSTM = forever $ do -- don't contact all neighbours unless the previous one failed/ Left ed - predStabilise <- stabiliseClosestResponder ns predecessors 1 [] - succStabilise <- stabiliseClosestResponder ns predecessors 1 [] + predStabilise <- stabiliseClosestResponder oldNs predecessors 1 [] + succStabilise <- stabiliseClosestResponder oldNs predecessors 1 [] let (predDeletes, predNeighbours) = either (const ([], [])) id predStabilise @@ -518,6 +518,28 @@ stabiliseThread nsSTM = forever $ do writeTVar nsSTM $ addSuccessors [nextEntry] latestNs ) + newNs <- readTVarIO nsSTM + + let + oldPredecessor = headDef (toRemoteNodeState oldNs) $ predecessors oldNs + newPredecessor = headMay $ predecessors newNs + -- manage need for service data migration: + maybe (pure ()) (\newPredecessor' -> + when ( + isJust newPredecessor + && oldPredecessor /= newPredecessor' + -- case: predecessor has changed in some way => own responsibility has changed in some way + -- case 1: new predecessor is further away => broader responsibility, but new pred needs to push the data + -- If this is due to a node leaving without transfering its data, try getting it from a redundant copy + -- case 2: new predecessor is closer, it takes some of our data but somehow didn't join on us => push data to it + && isInOwnResponsibilitySlice newPredecessor' oldNs) $ do + ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode newNs) + migrationResult <- migrateData ownService (getNid newNs) (getNid oldPredecessor) (getNid newPredecessor') (getDomain newPredecessor', fromIntegral $ getServicePort newPredecessor') + -- TODO: deal with migration failure, e.g retry + pure () + ) + newPredecessor + putStrLn "stabilise run: end" -- TODO: make delay configurable threadDelay (60 * 10^6)