implement vserver leave and trigger data transfer initiation
still unused though contributes to #36
This commit is contained in:
parent
d2e4359a21
commit
4302452d18
|
@ -19,6 +19,7 @@ module Hash2Pub.DHTProtocol
|
||||||
, sendQueryIdMessages
|
, sendQueryIdMessages
|
||||||
, requestQueryID
|
, requestQueryID
|
||||||
, requestJoin
|
, requestJoin
|
||||||
|
, requestLeave
|
||||||
, requestPing
|
, requestPing
|
||||||
, requestStabilise
|
, requestStabilise
|
||||||
, lookupMessage
|
, lookupMessage
|
||||||
|
|
|
@ -263,6 +263,41 @@ fediChordVserverJoin nsSTM = do
|
||||||
Left err -> pure . Left $ "Error joining on " <> err
|
Left err -> pure . Left $ "Error joining on " <> err
|
||||||
Right joinedNS -> pure . Right $ joinedNS
|
Right joinedNS -> pure . Right $ joinedNS
|
||||||
|
|
||||||
|
fediChordVserverLeave :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeState s -> m ()
|
||||||
|
fediChordVserverLeave ns = do
|
||||||
|
-- TODO: deal with failure of all successors, e.g. by invoking a stabilise
|
||||||
|
-- and looking up further successors. So far we just fail here.
|
||||||
|
_ <- migrateSuccessor
|
||||||
|
-- then send leave messages to all other neighbours
|
||||||
|
-- TODO: distinguish between sending error causes on our side and on the
|
||||||
|
-- network/ target side. The latter cannot be fixed anyways while the
|
||||||
|
-- former could be worked around
|
||||||
|
|
||||||
|
-- send a leave message to all neighbours
|
||||||
|
forM_ (predecessors ns <> successors ns) $ liftIO . requestLeave ns
|
||||||
|
where
|
||||||
|
sendUntilSuccess i = maybe
|
||||||
|
(pure $ Left "Exhausted all successors")
|
||||||
|
(\neighb -> do
|
||||||
|
leaveResponse <- requestLeave ns neighb
|
||||||
|
case leaveResponse of
|
||||||
|
Left _ -> sendUntilSuccess (i+1)
|
||||||
|
-- return first successfully contacted neighbour,
|
||||||
|
-- so it can be contacted by the service layer for migration
|
||||||
|
Right _ -> pure $ Right neighb
|
||||||
|
)
|
||||||
|
$ atMay (successors ns) i
|
||||||
|
migrateSuccessor :: (MonadError String m, MonadIO m) => m ()
|
||||||
|
migrateSuccessor = do
|
||||||
|
-- send leave message to first responding successor
|
||||||
|
successorLeave <- liftIO $ sendUntilSuccess 0
|
||||||
|
-- trigger service data transfer for abandoned key space
|
||||||
|
migrateToNode <- liftEither successorLeave
|
||||||
|
ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode ns)
|
||||||
|
migrationResult <- liftIO $ migrateData ownService (getNid ns) (getNid migrateToNode) (getDomain migrateToNode, fromIntegral $ getServicePort migrateToNode)
|
||||||
|
liftEither migrationResult
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
-- | Wait for new cache entries to appear and then try joining on them.
|
-- | Wait for new cache entries to appear and then try joining on them.
|
||||||
-- Exits after successful joining.
|
-- Exits after successful joining.
|
||||||
|
|
|
@ -424,6 +424,12 @@ class Service s d where
|
||||||
-- | run the service
|
-- | run the service
|
||||||
runService :: ServiceConf -> d -> IO (s d)
|
runService :: ServiceConf -> d -> IO (s d)
|
||||||
getListeningPortFromService :: (Integral i) => s d -> i
|
getListeningPortFromService :: (Integral i) => s d -> i
|
||||||
|
-- | trigger a service data migration of data between the two given keys
|
||||||
|
migrateData :: s d
|
||||||
|
-> NodeID -- ^ start key
|
||||||
|
-> NodeID -- ^ end key
|
||||||
|
-> (String, Int) -- ^ hostname and port of target service
|
||||||
|
-> IO (Either String ()) -- ^ success or failure
|
||||||
|
|
||||||
instance Hashable.Hashable NodeID where
|
instance Hashable.Hashable NodeID where
|
||||||
hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID
|
hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID
|
||||||
|
|
|
@ -115,6 +115,8 @@ instance DHT d => Service PostService d where
|
||||||
|
|
||||||
getListeningPortFromService = fromIntegral . confServicePort . serviceConf
|
getListeningPortFromService = fromIntegral . confServicePort . serviceConf
|
||||||
|
|
||||||
|
migrateData = clientDeliverSubscriptions
|
||||||
|
|
||||||
|
|
||||||
-- | return a WAI application
|
-- | return a WAI application
|
||||||
postServiceApplication :: DHT d => PostService d -> Application
|
postServiceApplication :: DHT d => PostService d -> Application
|
||||||
|
@ -320,13 +322,13 @@ relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postM
|
||||||
-- and their outstanding delivery queue to another instance.
|
-- and their outstanding delivery queue to another instance.
|
||||||
-- If the transfer succeeds, the transfered subscribers are removed from the local list.
|
-- If the transfer succeeds, the transfered subscribers are removed from the local list.
|
||||||
clientDeliverSubscriptions :: PostService d
|
clientDeliverSubscriptions :: PostService d
|
||||||
-> Hashtag -- ^ fromTag
|
-> NodeID -- ^ fromTag
|
||||||
-> Hashtag -- ^ toTag
|
-> NodeID -- ^ toTag
|
||||||
-> (String, Int) -- ^ hostname and port of instance to deliver to
|
-> (String, Int) -- ^ hostname and port of instance to deliver to
|
||||||
-> IO (Either String ()) -- Either signals success or failure
|
-> IO (Either String ()) -- Either signals success or failure
|
||||||
clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do
|
clientDeliverSubscriptions serv fromKey toKey (toHost, toPort) = do
|
||||||
-- collect tag intearval
|
-- collect tag intearval
|
||||||
intervalTags <- takeRMapSuccessorsFromTo (genKeyID $ Txt.unpack fromTag) (genKeyID $ Txt.unpack toTag) <$> readTVarIO (subscribers serv)
|
intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv)
|
||||||
-- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ]
|
-- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ]
|
||||||
-- extract subscribers and posts
|
-- extract subscribers and posts
|
||||||
-- no need for extracting as a single atomic operation, as newly incoming posts are supposed to be rejected because of already having re-positioned on the DHT
|
-- no need for extracting as a single atomic operation, as newly incoming posts are supposed to be rejected because of already having re-positioned on the DHT
|
||||||
|
|
Loading…
Reference in a new issue