Compare commits
4 commits
4302452d18
...
c49c1a89c9
Author | SHA1 | Date | |
---|---|---|---|
|
c49c1a89c9 | ||
|
414564705a | ||
|
581757965a | ||
|
470ce6f39a |
|
@ -41,13 +41,14 @@ module Hash2Pub.DHTProtocol
|
|||
)
|
||||
where
|
||||
|
||||
import Control.Concurrent
|
||||
import Control.Concurrent.Async
|
||||
import Control.Concurrent.STM
|
||||
import Control.Concurrent.STM.TBQueue
|
||||
import Control.Concurrent.STM.TQueue
|
||||
import Control.Concurrent.STM.TVar
|
||||
import Control.Exception
|
||||
import Control.Monad (foldM, forM, forM_)
|
||||
import Control.Monad (foldM, forM, forM_, when)
|
||||
import qualified Data.ByteString as BS
|
||||
import Data.Either (rights)
|
||||
import Data.Foldable (foldl', foldr')
|
||||
|
@ -75,10 +76,11 @@ import Hash2Pub.FediChordTypes (CacheEntry (..),
|
|||
LocalNodeState (..),
|
||||
LocalNodeStateSTM, NodeCache,
|
||||
NodeID, NodeState (..),
|
||||
RealNode (..),
|
||||
RealNode (..), RealNodeSTM,
|
||||
RemoteNodeState (..),
|
||||
RingEntry (..), RingMap (..),
|
||||
addRMapEntry, addRMapEntryWith,
|
||||
Service (..), addRMapEntry,
|
||||
addRMapEntryWith,
|
||||
cacheGetNodeStateUnvalidated,
|
||||
cacheLookup, cacheLookupPred,
|
||||
cacheLookupSucc, genNodeID,
|
||||
|
@ -250,7 +252,8 @@ ackRequest _ _ = Map.empty
|
|||
|
||||
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
|
||||
-- the response to be sent.
|
||||
handleIncomingRequest :: LocalNodeStateSTM s -- ^ the handling node
|
||||
handleIncomingRequest :: Service s (RealNodeSTM s)
|
||||
=> LocalNodeStateSTM s -- ^ the handling node
|
||||
-> TQueue (BS.ByteString, SockAddr) -- ^ send queue
|
||||
-> Set.Set FediChordMessage -- ^ all parts of the request to handle
|
||||
-> SockAddr -- ^ source address of the request
|
||||
|
@ -422,10 +425,10 @@ respondPing nsSTM msgSet = do
|
|||
|
||||
-- this modifies node state, so locking and IO seems to be necessary.
|
||||
-- Still try to keep as much code as possible pure
|
||||
respondJoin :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
||||
respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
|
||||
respondJoin nsSTM msgSet = do
|
||||
-- atomically read and modify the node state according to the parsed request
|
||||
responseMsg <- atomically $ do
|
||||
(dataMigration, responseMsg) <- atomically $ do
|
||||
nsSnap <- readTVar nsSTM
|
||||
cache <- readTVar $ nodeCacheSTM nsSnap
|
||||
let
|
||||
|
@ -455,24 +458,31 @@ respondJoin nsSTM msgSet = do
|
|||
, payload = Just responsePayload
|
||||
}
|
||||
writeTVar nsSTM joinedNS
|
||||
pure joinResponse
|
||||
ownService <- nodeService <$> readTVar (parentRealNode nsSnap)
|
||||
let
|
||||
serviceDataMigrator = migrateData ownService (getNid nsSnap) lowerKeyBound (getNid senderNS) (getDomain senderNS, fromIntegral $ getServicePort senderNS)
|
||||
lowerKeyBound = maybe (getNid nsSnap) getNid $ headMay (predecessors nsSnap)
|
||||
pure (Just serviceDataMigrator, joinResponse)
|
||||
-- otherwise respond with empty payload
|
||||
else pure Response {
|
||||
else pure (Nothing, Response {
|
||||
requestID = requestID aRequestPart
|
||||
, senderID = getNid nsSnap
|
||||
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
|
||||
, isFinalPart = False
|
||||
, action = Join
|
||||
, payload = Nothing
|
||||
}
|
||||
})
|
||||
|
||||
-- as DHT response is required immediately, fork the service data migration push
|
||||
-- into a new thread. That's kind of ugly but the best I can think of so far
|
||||
when (isJust dataMigration) (forkIO (fromJust dataMigration >> pure ()) >> pure ())
|
||||
pure $ serialiseMessage sendMessageSize responseMsg
|
||||
-- TODO: notify service layer to copy over data now handled by the new joined node
|
||||
|
||||
-- ....... request sending .......
|
||||
|
||||
-- | send a join request and return the joined 'LocalNodeState' including neighbours
|
||||
requestJoin :: NodeState a => a -- ^ currently responsible node to be contacted
|
||||
requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ currently responsible node to be contacted
|
||||
-> LocalNodeStateSTM s -- ^ joining NodeState
|
||||
-> IO (Either String (LocalNodeStateSTM s)) -- ^ node after join with all its new information
|
||||
requestJoin toJoinOn ownStateSTM = do
|
||||
|
@ -511,12 +521,15 @@ requestJoin toJoinOn ownStateSTM = do
|
|||
pure (cacheInsertQ, newState)
|
||||
-- execute the cache insertions
|
||||
mapM_ (\f -> f joinedState) cacheInsertQ
|
||||
pure $ if responses == Set.empty
|
||||
then Left $ "join error: got no response from " <> show (getNid toJoinOn)
|
||||
if responses == Set.empty
|
||||
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
|
||||
else if null (predecessors joinedState) && null (successors joinedState)
|
||||
then Left "join error: no predecessors or successors"
|
||||
then pure $ Left "join error: no predecessors or successors"
|
||||
-- successful join
|
||||
else Right ownStateSTM
|
||||
else do
|
||||
-- wait for migration data to be completely received
|
||||
waitForMigrationFrom (nodeService prn) (getNid ownState)
|
||||
pure $ Right ownStateSTM
|
||||
)
|
||||
`catch` (\e -> pure . Left $ displayException (e :: IOException))
|
||||
|
||||
|
|
|
@ -152,7 +152,8 @@ nodeStateInit realNodeSTM = do
|
|||
|
||||
-- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed
|
||||
-- for resolving the new node's position.
|
||||
fediChordBootstrapJoin :: LocalNodeStateSTM s -- ^ the local 'NodeState'
|
||||
fediChordBootstrapJoin :: Service s (RealNodeSTM s)
|
||||
=> LocalNodeStateSTM s -- ^ the local 'NodeState'
|
||||
-> (String, PortNumber) -- ^ domain and port of a bootstrapping node
|
||||
-> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a
|
||||
-- successful join, otherwise an error message
|
||||
|
@ -170,7 +171,7 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do
|
|||
|
||||
-- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters.
|
||||
-- Unjoined try joining instead.
|
||||
convergenceSampleThread :: LocalNodeStateSTM s -> IO ()
|
||||
convergenceSampleThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
|
||||
convergenceSampleThread nsSTM = forever $ do
|
||||
nsSnap <- readTVarIO nsSTM
|
||||
parentNode <- readTVarIO $ parentRealNode nsSnap
|
||||
|
@ -201,7 +202,7 @@ convergenceSampleThread nsSTM = forever $ do
|
|||
|
||||
|
||||
-- | Try joining the DHT through any of the bootstrapping nodes until it succeeds.
|
||||
tryBootstrapJoining :: LocalNodeStateSTM s -> IO (Either String (LocalNodeStateSTM s))
|
||||
tryBootstrapJoining :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO (Either String (LocalNodeStateSTM s))
|
||||
tryBootstrapJoining nsSTM = do
|
||||
bss <- atomically $ do
|
||||
nsSnap <- readTVar nsSTM
|
||||
|
@ -249,7 +250,8 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
|
|||
|
||||
-- | join a node to the DHT using the global node cache
|
||||
-- node's position.
|
||||
fediChordVserverJoin :: LocalNodeStateSTM s -- ^ the local 'NodeState'
|
||||
fediChordVserverJoin :: Service s (RealNodeSTM s)
|
||||
=> LocalNodeStateSTM s -- ^ the local 'NodeState'
|
||||
-> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a
|
||||
-- successful join, otherwise an error message
|
||||
fediChordVserverJoin nsSTM = do
|
||||
|
@ -293,15 +295,18 @@ fediChordVserverLeave ns = do
|
|||
successorLeave <- liftIO $ sendUntilSuccess 0
|
||||
-- trigger service data transfer for abandoned key space
|
||||
migrateToNode <- liftEither successorLeave
|
||||
let lowerKeyBound = maybe (getNid ns) getNid $ headMay (predecessors ns)
|
||||
ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode ns)
|
||||
migrationResult <- liftIO $ migrateData ownService (getNid ns) (getNid migrateToNode) (getDomain migrateToNode, fromIntegral $ getServicePort migrateToNode)
|
||||
-- previously held data is the one between the immediate predecessor and
|
||||
-- the own ID
|
||||
migrationResult <- liftIO $ migrateData ownService (getNid ns) lowerKeyBound (getNid ns) (getDomain migrateToNode, fromIntegral $ getServicePort migrateToNode)
|
||||
liftEither migrationResult
|
||||
|
||||
|
||||
|
||||
-- | Wait for new cache entries to appear and then try joining on them.
|
||||
-- Exits after successful joining.
|
||||
joinOnNewEntriesThread :: LocalNodeStateSTM s -> IO ()
|
||||
joinOnNewEntriesThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
|
||||
joinOnNewEntriesThread nsSTM = loop
|
||||
where
|
||||
loop = do
|
||||
|
@ -576,7 +581,7 @@ sendThread sock sendQ = forever $ do
|
|||
sendAllTo sock packet addr
|
||||
|
||||
-- | Sets up and manages the main server threads of FediChord
|
||||
fediMainThreads :: Socket -> LocalNodeStateSTM s -> IO ()
|
||||
fediMainThreads :: Service s (RealNodeSTM s) => Socket -> LocalNodeStateSTM s -> IO ()
|
||||
fediMainThreads sock nsSTM = do
|
||||
ns <- readTVarIO nsSTM
|
||||
putStrLn "launching threads"
|
||||
|
@ -619,7 +624,8 @@ requestMapPurge mapVar = forever $ do
|
|||
|
||||
-- | Wait for messages, deserialise them, manage parts and acknowledgement status,
|
||||
-- and pass them to their specific handling function.
|
||||
fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue
|
||||
fediMessageHandler :: Service s (RealNodeSTM s)
|
||||
=> TQueue (BS.ByteString, SockAddr) -- ^ send queue
|
||||
-> TQueue (BS.ByteString, SockAddr) -- ^ receive queue
|
||||
-> LocalNodeStateSTM s -- ^ acting NodeState
|
||||
-> IO ()
|
||||
|
|
|
@ -426,10 +426,13 @@ class Service s d where
|
|||
getListeningPortFromService :: (Integral i) => s d -> i
|
||||
-- | trigger a service data migration of data between the two given keys
|
||||
migrateData :: s d
|
||||
-> NodeID -- ^ source/ sender node ID
|
||||
-> NodeID -- ^ start key
|
||||
-> NodeID -- ^ end key
|
||||
-> (String, Int) -- ^ hostname and port of target service
|
||||
-> IO (Either String ()) -- ^ success or failure
|
||||
-- | Wait for an incoming migration from a given node to succeed, may block forever
|
||||
waitForMigrationFrom :: s d -> NodeID -> IO ()
|
||||
|
||||
instance Hashable.Hashable NodeID where
|
||||
hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID
|
||||
|
|
|
@ -10,6 +10,7 @@ module Hash2Pub.PostService where
|
|||
|
||||
import Control.Concurrent
|
||||
import Control.Concurrent.Async
|
||||
import Control.Concurrent.MVar
|
||||
import Control.Concurrent.STM
|
||||
import Control.Concurrent.STM.TChan
|
||||
import Control.Concurrent.STM.TChan
|
||||
|
@ -59,6 +60,7 @@ data PostService d = PostService
|
|||
, relayInQueue :: TQueue (Hashtag, PostID, PostContent)
|
||||
-- ^ Queue for processing incoming posts of own instance asynchronously
|
||||
, postFetchQueue :: TQueue PostID
|
||||
, migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ()))
|
||||
, httpMan :: HTTP.Manager
|
||||
}
|
||||
deriving (Typeable)
|
||||
|
@ -86,6 +88,7 @@ instance DHT d => Service PostService d where
|
|||
ownPostVar <- newTVarIO HSet.empty
|
||||
relayInQueue' <- newTQueueIO
|
||||
postFetchQueue' <- newTQueueIO
|
||||
migrationsInProgress' <- newTVarIO HMap.empty
|
||||
httpMan' <- HTTP.newManager HTTP.defaultManagerSettings
|
||||
let
|
||||
thisService = PostService {
|
||||
|
@ -97,6 +100,7 @@ instance DHT d => Service PostService d where
|
|||
, ownPosts = ownPostVar
|
||||
, relayInQueue = relayInQueue'
|
||||
, postFetchQueue = postFetchQueue'
|
||||
, migrationsInProgress = migrationsInProgress'
|
||||
, httpMan = httpMan'
|
||||
}
|
||||
port' = fromIntegral (confServicePort conf)
|
||||
|
@ -117,6 +121,17 @@ instance DHT d => Service PostService d where
|
|||
|
||||
migrateData = clientDeliverSubscriptions
|
||||
|
||||
waitForMigrationFrom serv fromID = do
|
||||
migrationSynchroniser <- atomically $ do
|
||||
syncPoint <- HMap.lookup fromID <$> readTVar (migrationsInProgress serv)
|
||||
maybe
|
||||
-- decision: this function blocks until it gets an incoming migration from given ID
|
||||
retry
|
||||
pure
|
||||
syncPoint
|
||||
-- block until migration finished
|
||||
takeMVar migrationSynchroniser
|
||||
|
||||
|
||||
-- | return a WAI application
|
||||
postServiceApplication :: DHT d => PostService d -> Application
|
||||
|
@ -136,7 +151,7 @@ placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB
|
|||
|
||||
type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
|
||||
-- delivery endpoint of newly published posts of the relay's instance
|
||||
:<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text
|
||||
:<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text
|
||||
-- endpoint for delivering the subscriptions and outstanding queue
|
||||
:<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text
|
||||
-- fetch endpoint for posts, full post ID is http://$domain/post/$postid
|
||||
|
@ -194,10 +209,14 @@ newtype UnhandledTagException = UnhandledTagException String
|
|||
|
||||
instance Exception UnhandledTagException
|
||||
|
||||
subscriptionDelivery :: DHT d => PostService d -> Txt.Text -> Handler Txt.Text
|
||||
subscriptionDelivery serv subList = do
|
||||
subscriptionDelivery :: DHT d => PostService d -> Integer -> Txt.Text -> Handler Txt.Text
|
||||
subscriptionDelivery serv senderID subList = do
|
||||
let
|
||||
tagSubs = Txt.lines subList
|
||||
-- signal that the migration is in progress
|
||||
syncMVar <- liftIO newEmptyMVar
|
||||
liftIO . atomically $ modifyTVar' (migrationsInProgress serv) $
|
||||
HMap.insert (fromInteger senderID) syncMVar
|
||||
-- In favor of having the convenience of rolling back the transaction once a
|
||||
-- not-handled tag occurs, this results in a single large transaction.
|
||||
-- Hopefully the performance isn't too bad.
|
||||
|
@ -211,6 +230,12 @@ subscriptionDelivery serv subList = do
|
|||
`catchSTM` (\e -> pure . Left $ show (e :: UnhandledTagException))
|
||||
-- TODO: potentially log this
|
||||
:: STM (Either String ()))
|
||||
-- TODO: should this always signal migration finished to avoid deadlocksP
|
||||
liftIO $ putMVar syncMVar () -- wakes up waiting thread
|
||||
liftIO $ putMVar syncMVar () -- blocks until waiting thread has resumed
|
||||
-- delete this migration from ongoing ones
|
||||
liftIO . atomically $ modifyTVar' (migrationsInProgress serv) $
|
||||
HMap.delete (fromInteger senderID)
|
||||
case res of
|
||||
Left err -> throwError err410 {errBody = BSUL.fromString err}
|
||||
Right _ -> pure ""
|
||||
|
@ -322,11 +347,12 @@ relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postM
|
|||
-- and their outstanding delivery queue to another instance.
|
||||
-- If the transfer succeeds, the transfered subscribers are removed from the local list.
|
||||
clientDeliverSubscriptions :: PostService d
|
||||
-> NodeID -- ^ sender node ID
|
||||
-> NodeID -- ^ fromTag
|
||||
-> NodeID -- ^ toTag
|
||||
-> (String, Int) -- ^ hostname and port of instance to deliver to
|
||||
-> IO (Either String ()) -- Either signals success or failure
|
||||
clientDeliverSubscriptions serv fromKey toKey (toHost, toPort) = do
|
||||
clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do
|
||||
-- collect tag intearval
|
||||
intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv)
|
||||
-- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ]
|
||||
|
@ -350,7 +376,7 @@ clientDeliverSubscriptions serv fromKey toKey (toHost, toPort) = do
|
|||
""
|
||||
intervalTags
|
||||
-- send subscribers
|
||||
resp <- runClientM (subscriptionDeliveryClient subscriberData) (mkClientEnv (httpMan serv) (BaseUrl Http toHost (fromIntegral toPort) ""))
|
||||
resp <- runClientM (subscriptionDeliveryClient (getNodeID fromNode) subscriberData) (mkClientEnv (httpMan serv) (BaseUrl Http toHost (fromIntegral toPort) ""))
|
||||
-- on failure return a Left, otherwise delete subscription entry
|
||||
case resp of
|
||||
Left err -> pure . Left . show $ err
|
||||
|
|
Loading…
Reference in a new issue