diff --git a/Hash2Pub.cabal b/Hash2Pub.cabal index 54cb29d..56441ad 100644 --- a/Hash2Pub.cabal +++ b/Hash2Pub.cabal @@ -55,7 +55,7 @@ library import: deps -- Modules exported by the library. - exposed-modules: Hash2Pub.FediChord, Hash2Pub.FediChordTypes, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes, Hash2Pub.PostService, Hash2Pub.RingMap + exposed-modules: Hash2Pub.FediChord, Hash2Pub.FediChordTypes, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes, Hash2Pub.PostService, Hash2Pub.ServiceTypes, Hash2Pub.RingMap -- Modules included in this library but not exported. other-modules: Hash2Pub.Utils diff --git a/app/Main.hs b/app/Main.hs index 98961c0..8887ee8 100644 --- a/app/Main.hs +++ b/app/Main.hs @@ -10,17 +10,15 @@ import Data.IP (IPv6, toHostAddress6) import System.Environment import Hash2Pub.FediChord -import Hash2Pub.FediChordTypes -import Hash2Pub.PostService (PostService (..)) main :: IO () main = do -- ToDo: parse and pass config -- probably use `tomland` for that - (fConf, sConf) <- readConfig + conf <- readConfig -- TODO: first initialise 'RealNode', then the vservers -- ToDo: load persisted caches, bootstrapping nodes … - (serverSock, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d)) + (serverSock, thisNode) <- fediChordInit conf -- currently no masking is necessary, as there is nothing to clean up nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode -- try joining the DHT using one of the provided bootstrapping nodes @@ -43,11 +41,10 @@ main = do pure () -readConfig :: IO (FediChordConf, ServiceConf) +readConfig :: IO FediChordConf readConfig = do - confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : servicePortString : speedup : _ <- getArgs - let - fConf = FediChordConf { + confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : _ <- getArgs + pure $ FediChordConf { confDomain = confDomainString , confIP = toHostAddress6 . read $ ipString , confDhtPort = read portString @@ -56,10 +53,3 @@ readConfig = do , confBootstrapSamplingInterval = 180 , confMaxLookupCacheAge = 300 } - sConf = ServiceConf { - confSubscriptionExpiryTime = 2*3600 `div` read speedup - , confServicePort = read servicePortString - , confServiceHost = confDomainString - } - pure (fConf, sConf) - diff --git a/src/Hash2Pub/FediChord.hs b/src/Hash2Pub/FediChord.hs index 70c9ff7..26a373c 100644 --- a/src/Hash2Pub/FediChord.hs +++ b/src/Hash2Pub/FediChord.hs @@ -95,23 +95,16 @@ import Debug.Trace (trace) -- | initialise data structures, compute own IDs and bind to listening socket -- ToDo: load persisted state, thus this function already operates in IO -fediChordInit :: (Service s RealNodeSTM) - => FediChordConf - -> (RealNodeSTM -> IO (s RealNodeSTM)) -- ^ runner function for service - -> IO (Socket, LocalNodeStateSTM) -fediChordInit initConf serviceRunner = do +fediChordInit :: FediChordConf -> IO (Socket, LocalNodeStateSTM) +fediChordInit initConf = do emptyLookupCache <- newTVarIO Map.empty let realNode = RealNode { vservers = [] , nodeConfig = initConf , bootstrapNodes = confBootstrapNodes initConf , lookupCacheSTM = emptyLookupCache - --, service = undefined } realNodeSTM <- newTVarIO realNode - -- launch service and set the reference in the RealNode - serv <- serviceRunner realNodeSTM - --atomically . writeTVar $ realNode { service = serv } initialState <- nodeStateInit realNodeSTM initialStateSTM <- newTVarIO initialState serverSock <- mkServerSocket (getIpAddr initialState) (getDhtPort initialState) diff --git a/src/Hash2Pub/FediChordTypes.hs b/src/Hash2Pub/FediChordTypes.hs index 604519e..d764b71 100644 --- a/src/Hash2Pub/FediChordTypes.hs +++ b/src/Hash2Pub/FediChordTypes.hs @@ -58,14 +58,11 @@ module Hash2Pub.FediChordTypes ( , bsAsIpAddr , FediChordConf(..) , DHT(..) - , Service(..) - , ServiceConf(..) ) where import Control.Exception import Data.Foldable (foldr') import Data.Function (on) -import qualified Data.Hashable as Hashable import Data.List (delete, nub, sortBy) import qualified Data.Map.Strict as Map import Data.Maybe (fromJust, fromMaybe, isJust, @@ -147,7 +144,6 @@ a `localCompare` b -- | Data for managing the virtual server nodes of this real node. -- Also contains shared data and config values. -- TODO: more data structures for k-choices bookkeeping ---data RealNode s = RealNode data RealNode = RealNode { vservers :: [LocalNodeStateSTM] -- ^ references to all active versers @@ -159,7 +155,6 @@ data RealNode = RealNode -- ^ a global cache of looked up keys and their associated nodes } ---type RealNodeSTM s = TVar (RealNode s) type RealNodeSTM = TVar RealNode -- | represents a node and all its important state @@ -416,26 +411,6 @@ data FediChordConf = FediChordConf } deriving (Show, Eq) --- ====== Service Types ============ - -class Service s d where - -- | run the service - runService :: ServiceConf -> d -> IO (s d) - getServicePort' :: (Integral i) => s d -> i - -instance Hashable.Hashable NodeID where - hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID - hash = Hashable.hash . getNodeID - -data ServiceConf = ServiceConf - { confSubscriptionExpiryTime :: Integer - -- ^ subscription lease expiration in seconds - , confServicePort :: Int - -- ^ listening port for service - , confServiceHost :: String - -- ^ hostname of service - } - class DHT d where -- | lookup the responsible host handling a given key string, -- possiblggy from a lookup cache diff --git a/src/Hash2Pub/PostService.hs b/src/Hash2Pub/PostService.hs index 264bccb..ef22e29 100644 --- a/src/Hash2Pub/PostService.hs +++ b/src/Hash2Pub/PostService.hs @@ -14,7 +14,7 @@ import Control.Concurrent.STM import Control.Concurrent.STM.TChan import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TVar -import Control.Monad (foldM, forM_, forever) +import Control.Monad (forM_, forever) import Control.Monad.IO.Class (liftIO) import qualified Data.ByteString.Lazy.UTF8 as BSU import qualified Data.HashMap.Strict as HMap @@ -32,10 +32,12 @@ import Servant import Hash2Pub.FediChordTypes import Hash2Pub.RingMap +import Hash2Pub.ServiceTypes data PostService d = PostService - { serviceConf :: ServiceConf + { psPort :: Warp.Port + , psHost :: String -- queues, other data structures , baseDHT :: (DHT d) => d , serviceThread :: TVar ThreadId @@ -54,17 +56,15 @@ type PostID = Txt.Text type PostContent = Txt.Text -- | For each handled tag, store its subscribers and provide a -- broadcast 'TChan' for enqueuing posts -type RelayTags = RingMap NodeID (TagSubscribersSTM, TChan PostID, Hashtag) -type TagSubscribersSTM = TVar TagSubscribers +type RelayTags = RingMap NodeID (TagSubscribers, TChan PostID, Hashtag) -- | each subscriber is identified by its contact data "hostname" "port" -- and holds a TChan duplicated from the broadcast TChan of the tag --- + an expiration timestamp -type TagSubscribers = (HMap.HashMap (String, Int) (TChan PostID, POSIXTime)) +type TagSubscribers = TVar (HMap.HashMap (String, Int) (TChan PostID)) instance DHT d => Service PostService d where -- | initialise 'PostService' data structures and run server - runService conf dht = do + runService dht host port = do -- create necessary TVars threadVar <- newTVarIO =<< myThreadId -- own thread ID as placeholder subscriberVar <- newTVarIO emptyRMap @@ -73,7 +73,8 @@ instance DHT d => Service PostService d where relayInQueue' <- newTQueueIO let thisService = PostService { - serviceConf = conf + psPort = port' + , psHost = host , baseDHT = dht , serviceThread = threadVar , subscribers = subscriberVar @@ -81,8 +82,8 @@ instance DHT d => Service PostService d where , ownPosts = ownPostVar , relayInQueue = relayInQueue' } - port' = fromIntegral (confServicePort conf) - warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings + port' = fromIntegral port + warpSettings = Warp.setPort port' . Warp.setHost (fromString host) $ Warp.defaultSettings -- Run 'concurrently_' from another thread to be able to return the -- 'PostService'. -- Terminating that parent thread will make all child threads terminate as well. @@ -95,7 +96,7 @@ instance DHT d => Service PostService d where atomically $ writeTVar threadVar servThreadID pure thisService - getServicePort' = fromIntegral . confServicePort . serviceConf + getServicePort s = fromIntegral $ psPort s -- | return a WAI application @@ -114,7 +115,7 @@ placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB -- ========= HTTP API and handlers ============= -type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent +type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent -- ^ delivery endpoint of newly published posts of the relay's instance :<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] NoContent -- ^ endpoint for delivering the subscriptions and outstanding queue @@ -122,8 +123,6 @@ type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBod -- ^ fetch endpoint for posts, full post ID is http://$domain/post/$postid :<|> "posts" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text -- ^ endpoint for fetching multiple posts at once - :<|> "posts" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent - -- ^ delivery endpoint of newly published posts of the relay's instance :<|> "tags" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text -- ^ delivery endpoint for posts of $tag at subscribing instance :<|> "tags" :> Capture "hashtag" Txt.Text :> "subscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Integer @@ -140,28 +139,28 @@ postServer service = relayInbox service :<|> subscriptionDelivery service :<|> postFetch service :<|> postMultiFetch service - :<|> postInbox service :<|> tagDelivery service :<|> tagSubscribe service :<|> tagUnsubscribe service -relayInbox :: PostService d -> Hashtag -> Txt.Text -> Handler NoContent -relayInbox serv tag posts = do +relayInbox :: PostService d -> Txt.Text -> Handler NoContent +relayInbox serv post = do + -- extract contained hashtags let - -- skip checking whether the post actually contains the tag, just drop full post - postIDs = head . Txt.splitOn "," <$> Txt.lines posts - broadcastChan <- liftIO $ atomically $ getTagBroadcastChannel serv tag - -- if tag is not in own responsibility, return a 410 Gone - maybe - (throwError $ err410 { errBody = "Relay is not responsible for this tag"}) - -- otherwise enqueue posts into broadcast queue of the tag - (\queue -> - liftIO $ forM_ postIDs (atomically . writeTChan queue) - ) - broadcastChan + containedTags = fmap (normaliseTag . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post + -- generate post ID + postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^128-1) :: IO Integer) + -- add ID to own posts + liftIO . atomically $ modifyTVar' (ownPosts serv) (HSet.insert postId) + -- enqueue a relay job for each tag + liftIO $ forM_ (containedTags :: [Txt.Text]) (\tag -> + atomically $ writeTQueue (relayInQueue serv) (tag, postId, post) + ) pure NoContent + + subscriptionDelivery :: PostService d -> Txt.Text -> Handler NoContent subscriptionDelivery serv subList = do let @@ -173,12 +172,10 @@ subscriptionDelivery serv subList = do processTag :: TVar RelayTags -> Txt.Text -> IO () processTag subscriberSTM tagData = do let - tag:subText:lease:posts:_ = Txt.splitOn "," tagData - -- ignore checking of lease time - leaseTime = fromIntegral (read . Txt.unpack $ lease :: Integer) + tag:subText:posts:_ = Txt.splitOn "," tagData sub = read . Txt.unpack $ subText :: (String, Int) postList = Txt.words posts - enqueueSubscription subscriberSTM (normaliseTag tag) sub postList leaseTime + enqueueSubscriptions subscriberSTM (normaliseTag tag) sub postList postFetch :: PostService d -> Txt.Text -> Handler Txt.Text @@ -189,34 +186,9 @@ postFetch serv postID = do then pure placeholderPost else throwError $ err404 { errBody = "No post found with this ID" } - postMultiFetch :: PostService d -> Txt.Text -> Handler Txt.Text -postMultiFetch serv postIDs = do - let idList = Txt.lines postIDs - postSet <- liftIO . readTVarIO . ownPosts $ serv - -- look up existence of all given post IDs, fail if even one is missing - foldM (\response postID -> - if HSet.member postID postSet - then pure $ placeholderPost <> "\n" <> response - else throwError $ err404 { errBody = "No post found with this ID" } - ) "" idList - - -postInbox :: PostService d -> Txt.Text -> Handler NoContent -postInbox serv post = do - -- extract contained hashtags - let - containedTags = fmap (normaliseTag . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post - -- generate post ID - postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^(128::Integer)-1) :: IO Integer) - -- add ID to own posts - liftIO . atomically $ modifyTVar' (ownPosts serv) (HSet.insert postId) - -- enqueue a relay job for each tag - liftIO $ forM_ (containedTags :: [Txt.Text]) (\tag -> - atomically $ writeTQueue (relayInQueue serv) (tag, postId, post) - ) - pure NoContent - +postMultiFetch serv postIDs = pure $ "Here be multiple post dragons: " + <> (Txt.unwords . Txt.lines $ postIDs) tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text tagDelivery serv hashtag posts = pure $ "Here be #" <> hashtag <> " dragons with " <> posts @@ -232,61 +204,39 @@ tagUnsubscribe serv hashtag origin = pure $ "Here be a dragon unsubscription fro -- | Write all pending posts of a subscriber-tag-combination to its queue. -- Sets up all necessary data structures if they are still missing. -enqueueSubscription :: TVar RelayTags -- tag-subscriber map +enqueueSubscriptions :: TVar RelayTags -- tag-subscriber map -> Hashtag -- hashtag of pending posts -> (String, Int) -- subscriber's connection information -> [PostID] -- pending posts - -> POSIXTime -- lease expiry time -> IO () -enqueueSubscription tagMapSTM tag subscriber posts leaseTime = do +enqueueSubscriptions tagMapSTM tag subscriber posts = do -- get the tag output queue and, if necessary, create it - subChan <- atomically $ setupSubscriberChannel tagMapSTM tag subscriber leaseTime + subChan <- atomically setupSubscriberChannel forM_ posts (atomically . writeTChan subChan) - - --- | STM operation to return the outgoing post queue of a tag to a specified subscriber. --- If the queue doesn't exist yet, all necessary data structures are set up accordingly. -setupSubscriberChannel :: TVar RelayTags -> Hashtag -> (String, Int) -> POSIXTime -> STM (TChan PostID) -setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do - tagMap <- readTVar tagMapSTM - case lookupRelayTags tag tagMap of - Nothing -> do - -- if no collision/ tag doesn't exist yet, just initialize a - -- new subscriber map - broadcastChan <- newBroadcastTChan - tagOutChan <- dupTChan broadcastChan - newSubMapSTM <- newTVar $ HMap.singleton subscriber (tagOutChan, leaseTime) - writeTVar tagMapSTM $ addRMapEntry (genKeyID . Txt.unpack $ tag) (newSubMapSTM, broadcastChan, tag) tagMap - pure tagOutChan - Just (foundSubMapSTM, broadcastChan, _) -> do - -- otherwise use the existing subscriber map - foundSubMap <- readTVar foundSubMapSTM - case HMap.lookup subscriber foundSubMap of - Nothing -> do - -- for new subscribers, create new output channel - tagOutChan <- dupTChan broadcastChan - writeTVar foundSubMapSTM $ HMap.insert subscriber (tagOutChan, leaseTime) foundSubMap - pure tagOutChan - -- existing subscriber's channels are just returned - Just (tagOutChan, _) -> pure tagOutChan - - --- | returns the broadcast channel of a hashtag if there are any subscribers to it -getTagBroadcastChannel :: PostService d -> Hashtag -> STM (Maybe (TChan PostID)) -getTagBroadcastChannel serv tag = do - tagMap <- readTVar $ subscribers serv - case lookupRelayTags tag tagMap of - Nothing -> pure Nothing - Just (subscriberSTM, broadcastChan, _) -> do - subscriberMap <- readTVar subscriberSTM - if HMap.null subscriberMap - then pure Nothing - else pure (Just broadcastChan) - - --- | look up the subscription data of a tag -lookupRelayTags :: Hashtag -> RelayTags -> Maybe (TagSubscribersSTM, TChan PostID, Hashtag) -lookupRelayTags tag = rMapLookup (genKeyID . Txt.unpack $ tag) + where + setupSubscriberChannel :: STM (TChan PostID) + setupSubscriberChannel = do + tagMap <- readTVar tagMapSTM + case rMapLookup (genKeyID . Txt.unpack $ tag) tagMap of + Nothing -> do + -- if no collision/ tag doesn't exist yet, just initialize a + -- new subscriber map + broadcastChan <- newBroadcastTChan + tagOutChan <- dupTChan broadcastChan + newSubMapSTM <- newTVar $ HMap.singleton subscriber tagOutChan + writeTVar tagMapSTM $ addRMapEntry (genKeyID . Txt.unpack $ tag) (newSubMapSTM, broadcastChan, tag) tagMap + pure tagOutChan + Just (foundSubMapSTM, broadcastChan, _) -> do + -- otherwise use the existing subscriber map + foundSubMap <- readTVar foundSubMapSTM + case HMap.lookup subscriber foundSubMap of + Nothing -> do + -- for new subscribers, create new output channel + tagOutChan <- dupTChan broadcastChan + writeTVar foundSubMapSTM $ HMap.insert subscriber tagOutChan foundSubMap + pure tagOutChan + -- existing subscriber's channels are just returned + Just tagOutChan -> pure tagOutChan -- normalise the unicode representation of a string to NFC diff --git a/src/Hash2Pub/ServiceTypes.hs b/src/Hash2Pub/ServiceTypes.hs new file mode 100644 index 0000000..5e2b37c --- /dev/null +++ b/src/Hash2Pub/ServiceTypes.hs @@ -0,0 +1,15 @@ +{-# LANGUAGE MultiParamTypeClasses #-} +module Hash2Pub.ServiceTypes where + +import Data.Hashable (Hashable (..)) + +import Hash2Pub.FediChord (DHT (..), NodeID (..)) + +class Service s d where + -- | run the service + runService :: (Integral i) => d -> String -> i -> IO (s d) + getServicePort :: (Integral i) => s d -> i + +instance Hashable NodeID where + hashWithSalt salt = hashWithSalt salt . getNodeID + hash = hash . getNodeID