From 4302452d18dd6fd88d472f2d8f1293e9d4774235 Mon Sep 17 00:00:00 2001 From: Trolli Schmittlauch Date: Sat, 15 Aug 2020 22:55:19 +0200 Subject: [PATCH] implement vserver leave and trigger data transfer initiation still unused though contributes to #36 --- src/Hash2Pub/DHTProtocol.hs | 1 + src/Hash2Pub/FediChord.hs | 35 ++++++++++++++++++++++++++++++++++ src/Hash2Pub/FediChordTypes.hs | 6 ++++++ src/Hash2Pub/PostService.hs | 10 ++++++---- 4 files changed, 48 insertions(+), 4 deletions(-) diff --git a/src/Hash2Pub/DHTProtocol.hs b/src/Hash2Pub/DHTProtocol.hs index 7ed5ec7..8930edc 100644 --- a/src/Hash2Pub/DHTProtocol.hs +++ b/src/Hash2Pub/DHTProtocol.hs @@ -19,6 +19,7 @@ module Hash2Pub.DHTProtocol , sendQueryIdMessages , requestQueryID , requestJoin + , requestLeave , requestPing , requestStabilise , lookupMessage diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index dbca8a5..c55d94c 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -263,6 +263,41 @@ fediChordVserverJoin nsSTM = do Left err -> pure . Left $ "Error joining on " <> err Right joinedNS -> pure . Right $ joinedNS +fediChordVserverLeave :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeState s -> m () +fediChordVserverLeave ns = do + -- TODO: deal with failure of all successors, e.g. by invoking a stabilise + -- and looking up further successors. So far we just fail here. + _ <- migrateSuccessor + -- then send leave messages to all other neighbours + -- TODO: distinguish between sending error causes on our side and on the + -- network/ target side. The latter cannot be fixed anyways while the + -- former could be worked around + + -- send a leave message to all neighbours + forM_ (predecessors ns <> successors ns) $ liftIO . requestLeave ns + where + sendUntilSuccess i = maybe + (pure $ Left "Exhausted all successors") + (\neighb -> do + leaveResponse <- requestLeave ns neighb + case leaveResponse of + Left _ -> sendUntilSuccess (i+1) + -- return first successfully contacted neighbour, + -- so it can be contacted by the service layer for migration + Right _ -> pure $ Right neighb + ) + $ atMay (successors ns) i + migrateSuccessor :: (MonadError String m, MonadIO m) => m () + migrateSuccessor = do + -- send leave message to first responding successor + successorLeave <- liftIO $ sendUntilSuccess 0 + -- trigger service data transfer for abandoned key space + migrateToNode <- liftEither successorLeave + ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode ns) + migrationResult <- liftIO $ migrateData ownService (getNid ns) (getNid migrateToNode) (getDomain migrateToNode, fromIntegral $ getServicePort migrateToNode) + liftEither migrationResult + + -- | Wait for new cache entries to appear and then try joining on them. -- Exits after successful joining. diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index 20d65fe..214ece2 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -424,6 +424,12 @@ class Service s d where -- | run the service runService :: ServiceConf -> d -> IO (s d) getListeningPortFromService :: (Integral i) => s d -> i + -- | trigger a service data migration of data between the two given keys + migrateData :: s d + -> NodeID -- ^ start key + -> NodeID -- ^ end key + -> (String, Int) -- ^ hostname and port of target service + -> IO (Either String ()) -- ^ success or failure instance Hashable.Hashable NodeID where hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 797a9e6..71998df 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -115,6 +115,8 @@ instance DHT d => Service PostService d where getListeningPortFromService = fromIntegral . confServicePort . serviceConf + migrateData = clientDeliverSubscriptions + -- | return a WAI application postServiceApplication :: DHT d => PostService d -> Application @@ -320,13 +322,13 @@ relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postM -- and their outstanding delivery queue to another instance. -- If the transfer succeeds, the transfered subscribers are removed from the local list. clientDeliverSubscriptions :: PostService d - -> Hashtag -- ^ fromTag - -> Hashtag -- ^ toTag + -> NodeID -- ^ fromTag + -> NodeID -- ^ toTag -> (String, Int) -- ^ hostname and port of instance to deliver to -> IO (Either String ()) -- Either signals success or failure -clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do +clientDeliverSubscriptions serv fromKey toKey (toHost, toPort) = do -- collect tag intearval - intervalTags <- takeRMapSuccessorsFromTo (genKeyID $ Txt.unpack fromTag) (genKeyID $ Txt.unpack toTag) <$> readTVarIO (subscribers serv) + intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv) -- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ] -- extract subscribers and posts -- no need for extracting as a single atomic operation, as newly incoming posts are supposed to be rejected because of already having re-positioned on the DHT