Compare commits

...

4 commits

4 changed files with 89 additions and 41 deletions

View file

@ -41,13 +41,14 @@ module Hash2Pub.DHTProtocol
)
where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue
import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar
import Control.Exception
import Control.Monad (foldM, forM, forM_)
import Control.Monad (foldM, forM, forM_, when)
import qualified Data.ByteString as BS
import Data.Either (rights)
import Data.Foldable (foldl', foldr')
@ -75,10 +76,11 @@ import Hash2Pub.FediChordTypes (CacheEntry (..),
LocalNodeState (..),
LocalNodeStateSTM, NodeCache,
NodeID, NodeState (..),
RealNode (..),
RealNode (..), RealNodeSTM,
RemoteNodeState (..),
RingEntry (..), RingMap (..),
addRMapEntry, addRMapEntryWith,
Service (..), addRMapEntry,
addRMapEntryWith,
cacheGetNodeStateUnvalidated,
cacheLookup, cacheLookupPred,
cacheLookupSucc, genNodeID,
@ -250,7 +252,8 @@ ackRequest _ _ = Map.empty
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
-- the response to be sent.
handleIncomingRequest :: LocalNodeStateSTM s -- ^ the handling node
handleIncomingRequest :: Service s (RealNodeSTM s)
=> LocalNodeStateSTM s -- ^ the handling node
-> TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> Set.Set FediChordMessage -- ^ all parts of the request to handle
-> SockAddr -- ^ source address of the request
@ -422,10 +425,10 @@ respondPing nsSTM msgSet = do
-- this modifies node state, so locking and IO seems to be necessary.
-- Still try to keep as much code as possible pure
respondJoin :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondJoin nsSTM msgSet = do
-- atomically read and modify the node state according to the parsed request
responseMsg <- atomically $ do
(dataMigration, responseMsg) <- atomically $ do
nsSnap <- readTVar nsSTM
cache <- readTVar $ nodeCacheSTM nsSnap
let
@ -455,24 +458,31 @@ respondJoin nsSTM msgSet = do
, payload = Just responsePayload
}
writeTVar nsSTM joinedNS
pure joinResponse
ownService <- nodeService <$> readTVar (parentRealNode nsSnap)
let
serviceDataMigrator = migrateData ownService (getNid nsSnap) lowerKeyBound (getNid senderNS) (getDomain senderNS, fromIntegral $ getServicePort senderNS)
lowerKeyBound = maybe (getNid nsSnap) getNid $ headMay (predecessors nsSnap)
pure (Just serviceDataMigrator, joinResponse)
-- otherwise respond with empty payload
else pure Response {
else pure (Nothing, Response {
requestID = requestID aRequestPart
, senderID = getNid nsSnap
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
, isFinalPart = False
, action = Join
, payload = Nothing
}
})
-- as DHT response is required immediately, fork the service data migration push
-- into a new thread. That's kind of ugly but the best I can think of so far
when (isJust dataMigration) (forkIO (fromJust dataMigration >> pure ()) >> pure ())
pure $ serialiseMessage sendMessageSize responseMsg
-- TODO: notify service layer to copy over data now handled by the new joined node
-- ....... request sending .......
-- | send a join request and return the joined 'LocalNodeState' including neighbours
requestJoin :: NodeState a => a -- ^ currently responsible node to be contacted
requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ currently responsible node to be contacted
-> LocalNodeStateSTM s -- ^ joining NodeState
-> IO (Either String (LocalNodeStateSTM s)) -- ^ node after join with all its new information
requestJoin toJoinOn ownStateSTM = do
@ -511,12 +521,15 @@ requestJoin toJoinOn ownStateSTM = do
pure (cacheInsertQ, newState)
-- execute the cache insertions
mapM_ (\f -> f joinedState) cacheInsertQ
pure $ if responses == Set.empty
then Left $ "join error: got no response from " <> show (getNid toJoinOn)
if responses == Set.empty
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
else if null (predecessors joinedState) && null (successors joinedState)
then Left "join error: no predecessors or successors"
then pure $ Left "join error: no predecessors or successors"
-- successful join
else Right ownStateSTM
else do
-- wait for migration data to be completely received
waitForMigrationFrom (nodeService prn) (getNid ownState)
pure $ Right ownStateSTM
)
`catch` (\e -> pure . Left $ displayException (e :: IOException))

View file

@ -152,7 +152,8 @@ nodeStateInit realNodeSTM = do
-- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed
-- for resolving the new node's position.
fediChordBootstrapJoin :: LocalNodeStateSTM s -- ^ the local 'NodeState'
fediChordBootstrapJoin :: Service s (RealNodeSTM s)
=> LocalNodeStateSTM s -- ^ the local 'NodeState'
-> (String, PortNumber) -- ^ domain and port of a bootstrapping node
-> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a
-- successful join, otherwise an error message
@ -170,7 +171,7 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do
-- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters.
-- Unjoined try joining instead.
convergenceSampleThread :: LocalNodeStateSTM s -> IO ()
convergenceSampleThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
convergenceSampleThread nsSTM = forever $ do
nsSnap <- readTVarIO nsSTM
parentNode <- readTVarIO $ parentRealNode nsSnap
@ -201,7 +202,7 @@ convergenceSampleThread nsSTM = forever $ do
-- | Try joining the DHT through any of the bootstrapping nodes until it succeeds.
tryBootstrapJoining :: LocalNodeStateSTM s -> IO (Either String (LocalNodeStateSTM s))
tryBootstrapJoining :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO (Either String (LocalNodeStateSTM s))
tryBootstrapJoining nsSTM = do
bss <- atomically $ do
nsSnap <- readTVar nsSTM
@ -249,7 +250,8 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
-- | join a node to the DHT using the global node cache
-- node's position.
fediChordVserverJoin :: LocalNodeStateSTM s -- ^ the local 'NodeState'
fediChordVserverJoin :: Service s (RealNodeSTM s)
=> LocalNodeStateSTM s -- ^ the local 'NodeState'
-> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a
-- successful join, otherwise an error message
fediChordVserverJoin nsSTM = do
@ -293,15 +295,18 @@ fediChordVserverLeave ns = do
successorLeave <- liftIO $ sendUntilSuccess 0
-- trigger service data transfer for abandoned key space
migrateToNode <- liftEither successorLeave
let lowerKeyBound = maybe (getNid ns) getNid $ headMay (predecessors ns)
ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode ns)
migrationResult <- liftIO $ migrateData ownService (getNid ns) (getNid migrateToNode) (getDomain migrateToNode, fromIntegral $ getServicePort migrateToNode)
-- previously held data is the one between the immediate predecessor and
-- the own ID
migrationResult <- liftIO $ migrateData ownService (getNid ns) lowerKeyBound (getNid ns) (getDomain migrateToNode, fromIntegral $ getServicePort migrateToNode)
liftEither migrationResult
-- | Wait for new cache entries to appear and then try joining on them.
-- Exits after successful joining.
joinOnNewEntriesThread :: LocalNodeStateSTM s -> IO ()
joinOnNewEntriesThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
joinOnNewEntriesThread nsSTM = loop
where
loop = do
@ -576,7 +581,7 @@ sendThread sock sendQ = forever $ do
sendAllTo sock packet addr
-- | Sets up and manages the main server threads of FediChord
fediMainThreads :: Socket -> LocalNodeStateSTM s -> IO ()
fediMainThreads :: Service s (RealNodeSTM s) => Socket -> LocalNodeStateSTM s -> IO ()
fediMainThreads sock nsSTM = do
ns <- readTVarIO nsSTM
putStrLn "launching threads"
@ -619,7 +624,8 @@ requestMapPurge mapVar = forever $ do
-- | Wait for messages, deserialise them, manage parts and acknowledgement status,
-- and pass them to their specific handling function.
fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue
fediMessageHandler :: Service s (RealNodeSTM s)
=> TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> TQueue (BS.ByteString, SockAddr) -- ^ receive queue
-> LocalNodeStateSTM s -- ^ acting NodeState
-> IO ()

View file

@ -426,10 +426,13 @@ class Service s d where
getListeningPortFromService :: (Integral i) => s d -> i
-- | trigger a service data migration of data between the two given keys
migrateData :: s d
-> NodeID -- ^ source/ sender node ID
-> NodeID -- ^ start key
-> NodeID -- ^ end key
-> (String, Int) -- ^ hostname and port of target service
-> IO (Either String ()) -- ^ success or failure
-- | Wait for an incoming migration from a given node to succeed, may block forever
waitForMigrationFrom :: s d -> NodeID -> IO ()
instance Hashable.Hashable NodeID where
hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID

View file

@ -10,6 +10,7 @@ module Hash2Pub.PostService where
import Control.Concurrent
import Control.Concurrent.Async
import Control.Concurrent.MVar
import Control.Concurrent.STM
import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.TChan
@ -59,6 +60,7 @@ data PostService d = PostService
, relayInQueue :: TQueue (Hashtag, PostID, PostContent)
-- ^ Queue for processing incoming posts of own instance asynchronously
, postFetchQueue :: TQueue PostID
, migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ()))
, httpMan :: HTTP.Manager
}
deriving (Typeable)
@ -86,6 +88,7 @@ instance DHT d => Service PostService d where
ownPostVar <- newTVarIO HSet.empty
relayInQueue' <- newTQueueIO
postFetchQueue' <- newTQueueIO
migrationsInProgress' <- newTVarIO HMap.empty
httpMan' <- HTTP.newManager HTTP.defaultManagerSettings
let
thisService = PostService {
@ -97,6 +100,7 @@ instance DHT d => Service PostService d where
, ownPosts = ownPostVar
, relayInQueue = relayInQueue'
, postFetchQueue = postFetchQueue'
, migrationsInProgress = migrationsInProgress'
, httpMan = httpMan'
}
port' = fromIntegral (confServicePort conf)
@ -117,6 +121,17 @@ instance DHT d => Service PostService d where
migrateData = clientDeliverSubscriptions
waitForMigrationFrom serv fromID = do
migrationSynchroniser <- atomically $ do
syncPoint <- HMap.lookup fromID <$> readTVar (migrationsInProgress serv)
maybe
-- decision: this function blocks until it gets an incoming migration from given ID
retry
pure
syncPoint
-- block until migration finished
takeMVar migrationSynchroniser
-- | return a WAI application
postServiceApplication :: DHT d => PostService d -> Application
@ -136,7 +151,7 @@ placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB
type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
-- delivery endpoint of newly published posts of the relay's instance
:<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text
:<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text
-- endpoint for delivering the subscriptions and outstanding queue
:<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text
-- fetch endpoint for posts, full post ID is http://$domain/post/$postid
@ -194,10 +209,14 @@ newtype UnhandledTagException = UnhandledTagException String
instance Exception UnhandledTagException
subscriptionDelivery :: DHT d => PostService d -> Txt.Text -> Handler Txt.Text
subscriptionDelivery serv subList = do
subscriptionDelivery :: DHT d => PostService d -> Integer -> Txt.Text -> Handler Txt.Text
subscriptionDelivery serv senderID subList = do
let
tagSubs = Txt.lines subList
-- signal that the migration is in progress
syncMVar <- liftIO newEmptyMVar
liftIO . atomically $ modifyTVar' (migrationsInProgress serv) $
HMap.insert (fromInteger senderID) syncMVar
-- In favor of having the convenience of rolling back the transaction once a
-- not-handled tag occurs, this results in a single large transaction.
-- Hopefully the performance isn't too bad.
@ -211,6 +230,12 @@ subscriptionDelivery serv subList = do
`catchSTM` (\e -> pure . Left $ show (e :: UnhandledTagException))
-- TODO: potentially log this
:: STM (Either String ()))
-- TODO: should this always signal migration finished to avoid deadlocksP
liftIO $ putMVar syncMVar () -- wakes up waiting thread
liftIO $ putMVar syncMVar () -- blocks until waiting thread has resumed
-- delete this migration from ongoing ones
liftIO . atomically $ modifyTVar' (migrationsInProgress serv) $
HMap.delete (fromInteger senderID)
case res of
Left err -> throwError err410 {errBody = BSUL.fromString err}
Right _ -> pure ""
@ -322,11 +347,12 @@ relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postM
-- and their outstanding delivery queue to another instance.
-- If the transfer succeeds, the transfered subscribers are removed from the local list.
clientDeliverSubscriptions :: PostService d
-> NodeID -- ^ sender node ID
-> NodeID -- ^ fromTag
-> NodeID -- ^ toTag
-> (String, Int) -- ^ hostname and port of instance to deliver to
-> IO (Either String ()) -- Either signals success or failure
clientDeliverSubscriptions serv fromKey toKey (toHost, toPort) = do
clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do
-- collect tag intearval
intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv)
-- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ]
@ -350,7 +376,7 @@ clientDeliverSubscriptions serv fromKey toKey (toHost, toPort) = do
""
intervalTags
-- send subscribers
resp <- runClientM (subscriptionDeliveryClient subscriberData) (mkClientEnv (httpMan serv) (BaseUrl Http toHost (fromIntegral toPort) ""))
resp <- runClientM (subscriptionDeliveryClient (getNodeID fromNode) subscriberData) (mkClientEnv (httpMan serv) (BaseUrl Http toHost (fromIntegral toPort) ""))
-- on failure return a Left, otherwise delete subscription entry
case resp of
Left err -> pure . Left . show $ err