possibility to wait for a migration to complete
This commit is contained in:
parent
581757965a
commit
414564705a
|
@ -460,7 +460,7 @@ respondJoin nsSTM msgSet = do
|
||||||
writeTVar nsSTM joinedNS
|
writeTVar nsSTM joinedNS
|
||||||
ownService <- nodeService <$> readTVar (parentRealNode nsSnap)
|
ownService <- nodeService <$> readTVar (parentRealNode nsSnap)
|
||||||
let
|
let
|
||||||
serviceDataMigrator = migrateData ownService lowerKeyBound (getNid senderNS) (getDomain senderNS, fromIntegral $ getServicePort senderNS)
|
serviceDataMigrator = migrateData ownService (getNid nsSnap) lowerKeyBound (getNid senderNS) (getDomain senderNS, fromIntegral $ getServicePort senderNS)
|
||||||
lowerKeyBound = maybe (getNid nsSnap) getNid $ headMay (predecessors nsSnap)
|
lowerKeyBound = maybe (getNid nsSnap) getNid $ headMay (predecessors nsSnap)
|
||||||
pure (Just serviceDataMigrator, joinResponse)
|
pure (Just serviceDataMigrator, joinResponse)
|
||||||
-- otherwise respond with empty payload
|
-- otherwise respond with empty payload
|
||||||
|
|
|
@ -297,7 +297,7 @@ fediChordVserverLeave ns = do
|
||||||
ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode ns)
|
ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode ns)
|
||||||
-- previously held data is the one between the immediate predecessor and
|
-- previously held data is the one between the immediate predecessor and
|
||||||
-- the own ID
|
-- the own ID
|
||||||
migrationResult <- liftIO $ migrateData ownService lowerKeyBound (getNid ns) (getDomain migrateToNode, fromIntegral $ getServicePort migrateToNode)
|
migrationResult <- liftIO $ migrateData ownService (getNid ns) lowerKeyBound (getNid ns) (getDomain migrateToNode, fromIntegral $ getServicePort migrateToNode)
|
||||||
liftEither migrationResult
|
liftEither migrationResult
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -426,10 +426,13 @@ class Service s d where
|
||||||
getListeningPortFromService :: (Integral i) => s d -> i
|
getListeningPortFromService :: (Integral i) => s d -> i
|
||||||
-- | trigger a service data migration of data between the two given keys
|
-- | trigger a service data migration of data between the two given keys
|
||||||
migrateData :: s d
|
migrateData :: s d
|
||||||
|
-> NodeID -- ^ source/ sender node ID
|
||||||
-> NodeID -- ^ start key
|
-> NodeID -- ^ start key
|
||||||
-> NodeID -- ^ end key
|
-> NodeID -- ^ end key
|
||||||
-> (String, Int) -- ^ hostname and port of target service
|
-> (String, Int) -- ^ hostname and port of target service
|
||||||
-> IO (Either String ()) -- ^ success or failure
|
-> IO (Either String ()) -- ^ success or failure
|
||||||
|
-- | Wait for an incoming migration from a given node to succeed, may block forever
|
||||||
|
waitForMigrationFrom :: s d -> NodeID -> IO ()
|
||||||
|
|
||||||
instance Hashable.Hashable NodeID where
|
instance Hashable.Hashable NodeID where
|
||||||
hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID
|
hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID
|
||||||
|
|
|
@ -10,6 +10,7 @@ module Hash2Pub.PostService where
|
||||||
|
|
||||||
import Control.Concurrent
|
import Control.Concurrent
|
||||||
import Control.Concurrent.Async
|
import Control.Concurrent.Async
|
||||||
|
import Control.Concurrent.MVar
|
||||||
import Control.Concurrent.STM
|
import Control.Concurrent.STM
|
||||||
import Control.Concurrent.STM.TChan
|
import Control.Concurrent.STM.TChan
|
||||||
import Control.Concurrent.STM.TChan
|
import Control.Concurrent.STM.TChan
|
||||||
|
@ -46,20 +47,21 @@ import Hash2Pub.RingMap
|
||||||
|
|
||||||
|
|
||||||
data PostService d = PostService
|
data PostService d = PostService
|
||||||
{ serviceConf :: ServiceConf
|
{ serviceConf :: ServiceConf
|
||||||
-- queues, other data structures
|
-- queues, other data structures
|
||||||
, baseDHT :: (DHT d) => d
|
, baseDHT :: (DHT d) => d
|
||||||
, serviceThread :: TVar ThreadId
|
, serviceThread :: TVar ThreadId
|
||||||
, subscribers :: TVar RelayTags
|
, subscribers :: TVar RelayTags
|
||||||
-- ^ for each tag store the subscribers + their queue
|
-- ^ for each tag store the subscribers + their queue
|
||||||
, ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime)
|
, ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime)
|
||||||
-- ^ tags subscribed by the own node have an assigned lease time
|
-- ^ tags subscribed by the own node have an assigned lease time
|
||||||
, ownPosts :: TVar (HSet.HashSet Txt.Text)
|
, ownPosts :: TVar (HSet.HashSet Txt.Text)
|
||||||
-- ^ just store the existence of posts for saving memory,
|
-- ^ just store the existence of posts for saving memory,
|
||||||
, relayInQueue :: TQueue (Hashtag, PostID, PostContent)
|
, relayInQueue :: TQueue (Hashtag, PostID, PostContent)
|
||||||
-- ^ Queue for processing incoming posts of own instance asynchronously
|
-- ^ Queue for processing incoming posts of own instance asynchronously
|
||||||
, postFetchQueue :: TQueue PostID
|
, postFetchQueue :: TQueue PostID
|
||||||
, httpMan :: HTTP.Manager
|
, migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ()))
|
||||||
|
, httpMan :: HTTP.Manager
|
||||||
}
|
}
|
||||||
deriving (Typeable)
|
deriving (Typeable)
|
||||||
|
|
||||||
|
@ -86,6 +88,7 @@ instance DHT d => Service PostService d where
|
||||||
ownPostVar <- newTVarIO HSet.empty
|
ownPostVar <- newTVarIO HSet.empty
|
||||||
relayInQueue' <- newTQueueIO
|
relayInQueue' <- newTQueueIO
|
||||||
postFetchQueue' <- newTQueueIO
|
postFetchQueue' <- newTQueueIO
|
||||||
|
migrationsInProgress' <- newTVarIO HMap.empty
|
||||||
httpMan' <- HTTP.newManager HTTP.defaultManagerSettings
|
httpMan' <- HTTP.newManager HTTP.defaultManagerSettings
|
||||||
let
|
let
|
||||||
thisService = PostService {
|
thisService = PostService {
|
||||||
|
@ -97,6 +100,7 @@ instance DHT d => Service PostService d where
|
||||||
, ownPosts = ownPostVar
|
, ownPosts = ownPostVar
|
||||||
, relayInQueue = relayInQueue'
|
, relayInQueue = relayInQueue'
|
||||||
, postFetchQueue = postFetchQueue'
|
, postFetchQueue = postFetchQueue'
|
||||||
|
, migrationsInProgress = migrationsInProgress'
|
||||||
, httpMan = httpMan'
|
, httpMan = httpMan'
|
||||||
}
|
}
|
||||||
port' = fromIntegral (confServicePort conf)
|
port' = fromIntegral (confServicePort conf)
|
||||||
|
@ -117,6 +121,17 @@ instance DHT d => Service PostService d where
|
||||||
|
|
||||||
migrateData = clientDeliverSubscriptions
|
migrateData = clientDeliverSubscriptions
|
||||||
|
|
||||||
|
waitForMigrationFrom serv fromID = do
|
||||||
|
migrationSynchroniser <- atomically $ do
|
||||||
|
syncPoint <- HMap.lookup fromID <$> readTVar (migrationsInProgress serv)
|
||||||
|
maybe
|
||||||
|
-- decision: this function blocks until it gets an incoming migration from given ID
|
||||||
|
retry
|
||||||
|
pure
|
||||||
|
syncPoint
|
||||||
|
-- block until migration finished
|
||||||
|
takeMVar migrationSynchroniser
|
||||||
|
|
||||||
|
|
||||||
-- | return a WAI application
|
-- | return a WAI application
|
||||||
postServiceApplication :: DHT d => PostService d -> Application
|
postServiceApplication :: DHT d => PostService d -> Application
|
||||||
|
@ -136,7 +151,7 @@ placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB
|
||||||
|
|
||||||
type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
|
type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
|
||||||
-- delivery endpoint of newly published posts of the relay's instance
|
-- delivery endpoint of newly published posts of the relay's instance
|
||||||
:<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text
|
:<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text
|
||||||
-- endpoint for delivering the subscriptions and outstanding queue
|
-- endpoint for delivering the subscriptions and outstanding queue
|
||||||
:<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text
|
:<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text
|
||||||
-- fetch endpoint for posts, full post ID is http://$domain/post/$postid
|
-- fetch endpoint for posts, full post ID is http://$domain/post/$postid
|
||||||
|
@ -194,10 +209,14 @@ newtype UnhandledTagException = UnhandledTagException String
|
||||||
|
|
||||||
instance Exception UnhandledTagException
|
instance Exception UnhandledTagException
|
||||||
|
|
||||||
subscriptionDelivery :: DHT d => PostService d -> Txt.Text -> Handler Txt.Text
|
subscriptionDelivery :: DHT d => PostService d -> Integer -> Txt.Text -> Handler Txt.Text
|
||||||
subscriptionDelivery serv subList = do
|
subscriptionDelivery serv senderID subList = do
|
||||||
let
|
let
|
||||||
tagSubs = Txt.lines subList
|
tagSubs = Txt.lines subList
|
||||||
|
-- signal that the migration is in progress
|
||||||
|
syncMVar <- liftIO newEmptyMVar
|
||||||
|
liftIO . atomically $ modifyTVar' (migrationsInProgress serv) $
|
||||||
|
HMap.insert (fromInteger senderID) syncMVar
|
||||||
-- In favor of having the convenience of rolling back the transaction once a
|
-- In favor of having the convenience of rolling back the transaction once a
|
||||||
-- not-handled tag occurs, this results in a single large transaction.
|
-- not-handled tag occurs, this results in a single large transaction.
|
||||||
-- Hopefully the performance isn't too bad.
|
-- Hopefully the performance isn't too bad.
|
||||||
|
@ -211,6 +230,8 @@ subscriptionDelivery serv subList = do
|
||||||
`catchSTM` (\e -> pure . Left $ show (e :: UnhandledTagException))
|
`catchSTM` (\e -> pure . Left $ show (e :: UnhandledTagException))
|
||||||
-- TODO: potentially log this
|
-- TODO: potentially log this
|
||||||
:: STM (Either String ()))
|
:: STM (Either String ()))
|
||||||
|
-- TODO: should this always signal migration finished to avoid deadlocksP
|
||||||
|
liftIO $ putMVar syncMVar ()
|
||||||
case res of
|
case res of
|
||||||
Left err -> throwError err410 {errBody = BSUL.fromString err}
|
Left err -> throwError err410 {errBody = BSUL.fromString err}
|
||||||
Right _ -> pure ""
|
Right _ -> pure ""
|
||||||
|
@ -322,11 +343,12 @@ relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postM
|
||||||
-- and their outstanding delivery queue to another instance.
|
-- and their outstanding delivery queue to another instance.
|
||||||
-- If the transfer succeeds, the transfered subscribers are removed from the local list.
|
-- If the transfer succeeds, the transfered subscribers are removed from the local list.
|
||||||
clientDeliverSubscriptions :: PostService d
|
clientDeliverSubscriptions :: PostService d
|
||||||
|
-> NodeID -- ^ sender node ID
|
||||||
-> NodeID -- ^ fromTag
|
-> NodeID -- ^ fromTag
|
||||||
-> NodeID -- ^ toTag
|
-> NodeID -- ^ toTag
|
||||||
-> (String, Int) -- ^ hostname and port of instance to deliver to
|
-> (String, Int) -- ^ hostname and port of instance to deliver to
|
||||||
-> IO (Either String ()) -- Either signals success or failure
|
-> IO (Either String ()) -- Either signals success or failure
|
||||||
clientDeliverSubscriptions serv fromKey toKey (toHost, toPort) = do
|
clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do
|
||||||
-- collect tag intearval
|
-- collect tag intearval
|
||||||
intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv)
|
intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv)
|
||||||
-- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ]
|
-- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ]
|
||||||
|
@ -350,7 +372,7 @@ clientDeliverSubscriptions serv fromKey toKey (toHost, toPort) = do
|
||||||
""
|
""
|
||||||
intervalTags
|
intervalTags
|
||||||
-- send subscribers
|
-- send subscribers
|
||||||
resp <- runClientM (subscriptionDeliveryClient subscriberData) (mkClientEnv (httpMan serv) (BaseUrl Http toHost (fromIntegral toPort) ""))
|
resp <- runClientM (subscriptionDeliveryClient (getNodeID fromNode) subscriberData) (mkClientEnv (httpMan serv) (BaseUrl Http toHost (fromIntegral toPort) ""))
|
||||||
-- on failure return a Left, otherwise delete subscription entry
|
-- on failure return a Left, otherwise delete subscription entry
|
||||||
case resp of
|
case resp of
|
||||||
Left err -> pure . Left . show $ err
|
Left err -> pure . Left . show $ err
|
||||||
|
|
Loading…
Reference in a new issue