Compare commits

...

5 commits

6 changed files with 158 additions and 81 deletions

View file

@ -55,7 +55,7 @@ library
import: deps import: deps
-- Modules exported by the library. -- Modules exported by the library.
exposed-modules: Hash2Pub.FediChord, Hash2Pub.FediChordTypes, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes, Hash2Pub.PostService, Hash2Pub.ServiceTypes, Hash2Pub.RingMap exposed-modules: Hash2Pub.FediChord, Hash2Pub.FediChordTypes, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes, Hash2Pub.PostService, Hash2Pub.RingMap
-- Modules included in this library but not exported. -- Modules included in this library but not exported.
other-modules: Hash2Pub.Utils other-modules: Hash2Pub.Utils

View file

@ -10,15 +10,17 @@ import Data.IP (IPv6, toHostAddress6)
import System.Environment import System.Environment
import Hash2Pub.FediChord import Hash2Pub.FediChord
import Hash2Pub.FediChordTypes
import Hash2Pub.PostService (PostService (..))
main :: IO () main :: IO ()
main = do main = do
-- ToDo: parse and pass config -- ToDo: parse and pass config
-- probably use `tomland` for that -- probably use `tomland` for that
conf <- readConfig (fConf, sConf) <- readConfig
-- TODO: first initialise 'RealNode', then the vservers -- TODO: first initialise 'RealNode', then the vservers
-- ToDo: load persisted caches, bootstrapping nodes … -- ToDo: load persisted caches, bootstrapping nodes …
(serverSock, thisNode) <- fediChordInit conf (serverSock, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d))
-- currently no masking is necessary, as there is nothing to clean up -- currently no masking is necessary, as there is nothing to clean up
nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode
-- try joining the DHT using one of the provided bootstrapping nodes -- try joining the DHT using one of the provided bootstrapping nodes
@ -41,10 +43,11 @@ main = do
pure () pure ()
readConfig :: IO FediChordConf readConfig :: IO (FediChordConf, ServiceConf)
readConfig = do readConfig = do
confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : _ <- getArgs confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : servicePortString : speedup : _ <- getArgs
pure $ FediChordConf { let
fConf = FediChordConf {
confDomain = confDomainString confDomain = confDomainString
, confIP = toHostAddress6 . read $ ipString , confIP = toHostAddress6 . read $ ipString
, confDhtPort = read portString , confDhtPort = read portString
@ -53,3 +56,10 @@ readConfig = do
, confBootstrapSamplingInterval = 180 , confBootstrapSamplingInterval = 180
, confMaxLookupCacheAge = 300 , confMaxLookupCacheAge = 300
} }
sConf = ServiceConf {
confSubscriptionExpiryTime = 2*3600 `div` read speedup
, confServicePort = read servicePortString
, confServiceHost = confDomainString
}
pure (fConf, sConf)

View file

@ -95,16 +95,23 @@ import Debug.Trace (trace)
-- | initialise data structures, compute own IDs and bind to listening socket -- | initialise data structures, compute own IDs and bind to listening socket
-- ToDo: load persisted state, thus this function already operates in IO -- ToDo: load persisted state, thus this function already operates in IO
fediChordInit :: FediChordConf -> IO (Socket, LocalNodeStateSTM) fediChordInit :: (Service s RealNodeSTM)
fediChordInit initConf = do => FediChordConf
-> (RealNodeSTM -> IO (s RealNodeSTM)) -- ^ runner function for service
-> IO (Socket, LocalNodeStateSTM)
fediChordInit initConf serviceRunner = do
emptyLookupCache <- newTVarIO Map.empty emptyLookupCache <- newTVarIO Map.empty
let realNode = RealNode { let realNode = RealNode {
vservers = [] vservers = []
, nodeConfig = initConf , nodeConfig = initConf
, bootstrapNodes = confBootstrapNodes initConf , bootstrapNodes = confBootstrapNodes initConf
, lookupCacheSTM = emptyLookupCache , lookupCacheSTM = emptyLookupCache
--, service = undefined
} }
realNodeSTM <- newTVarIO realNode realNodeSTM <- newTVarIO realNode
-- launch service and set the reference in the RealNode
serv <- serviceRunner realNodeSTM
--atomically . writeTVar $ realNode { service = serv }
initialState <- nodeStateInit realNodeSTM initialState <- nodeStateInit realNodeSTM
initialStateSTM <- newTVarIO initialState initialStateSTM <- newTVarIO initialState
serverSock <- mkServerSocket (getIpAddr initialState) (getDhtPort initialState) serverSock <- mkServerSocket (getIpAddr initialState) (getDhtPort initialState)

View file

@ -58,11 +58,14 @@ module Hash2Pub.FediChordTypes (
, bsAsIpAddr , bsAsIpAddr
, FediChordConf(..) , FediChordConf(..)
, DHT(..) , DHT(..)
, Service(..)
, ServiceConf(..)
) where ) where
import Control.Exception import Control.Exception
import Data.Foldable (foldr') import Data.Foldable (foldr')
import Data.Function (on) import Data.Function (on)
import qualified Data.Hashable as Hashable
import Data.List (delete, nub, sortBy) import Data.List (delete, nub, sortBy)
import qualified Data.Map.Strict as Map import qualified Data.Map.Strict as Map
import Data.Maybe (fromJust, fromMaybe, isJust, import Data.Maybe (fromJust, fromMaybe, isJust,
@ -144,6 +147,7 @@ a `localCompare` b
-- | Data for managing the virtual server nodes of this real node. -- | Data for managing the virtual server nodes of this real node.
-- Also contains shared data and config values. -- Also contains shared data and config values.
-- TODO: more data structures for k-choices bookkeeping -- TODO: more data structures for k-choices bookkeeping
--data RealNode s = RealNode
data RealNode = RealNode data RealNode = RealNode
{ vservers :: [LocalNodeStateSTM] { vservers :: [LocalNodeStateSTM]
-- ^ references to all active versers -- ^ references to all active versers
@ -155,6 +159,7 @@ data RealNode = RealNode
-- ^ a global cache of looked up keys and their associated nodes -- ^ a global cache of looked up keys and their associated nodes
} }
--type RealNodeSTM s = TVar (RealNode s)
type RealNodeSTM = TVar RealNode type RealNodeSTM = TVar RealNode
-- | represents a node and all its important state -- | represents a node and all its important state
@ -411,6 +416,26 @@ data FediChordConf = FediChordConf
} }
deriving (Show, Eq) deriving (Show, Eq)
-- ====== Service Types ============
class Service s d where
-- | run the service
runService :: ServiceConf -> d -> IO (s d)
getServicePort' :: (Integral i) => s d -> i
instance Hashable.Hashable NodeID where
hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID
hash = Hashable.hash . getNodeID
data ServiceConf = ServiceConf
{ confSubscriptionExpiryTime :: Integer
-- ^ subscription lease expiration in seconds
, confServicePort :: Int
-- ^ listening port for service
, confServiceHost :: String
-- ^ hostname of service
}
class DHT d where class DHT d where
-- | lookup the responsible host handling a given key string, -- | lookup the responsible host handling a given key string,
-- possiblggy from a lookup cache -- possiblggy from a lookup cache

View file

@ -14,7 +14,7 @@ import Control.Concurrent.STM
import Control.Concurrent.STM.TChan import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar import Control.Concurrent.STM.TVar
import Control.Monad (forM_, forever) import Control.Monad (foldM, forM_, forever)
import Control.Monad.IO.Class (liftIO) import Control.Monad.IO.Class (liftIO)
import qualified Data.ByteString.Lazy.UTF8 as BSU import qualified Data.ByteString.Lazy.UTF8 as BSU
import qualified Data.HashMap.Strict as HMap import qualified Data.HashMap.Strict as HMap
@ -32,12 +32,10 @@ import Servant
import Hash2Pub.FediChordTypes import Hash2Pub.FediChordTypes
import Hash2Pub.RingMap import Hash2Pub.RingMap
import Hash2Pub.ServiceTypes
data PostService d = PostService data PostService d = PostService
{ psPort :: Warp.Port { serviceConf :: ServiceConf
, psHost :: String
-- queues, other data structures -- queues, other data structures
, baseDHT :: (DHT d) => d , baseDHT :: (DHT d) => d
, serviceThread :: TVar ThreadId , serviceThread :: TVar ThreadId
@ -56,15 +54,17 @@ type PostID = Txt.Text
type PostContent = Txt.Text type PostContent = Txt.Text
-- | For each handled tag, store its subscribers and provide a -- | For each handled tag, store its subscribers and provide a
-- broadcast 'TChan' for enqueuing posts -- broadcast 'TChan' for enqueuing posts
type RelayTags = RingMap NodeID (TagSubscribers, TChan PostID, Hashtag) type RelayTags = RingMap NodeID (TagSubscribersSTM, TChan PostID, Hashtag)
type TagSubscribersSTM = TVar TagSubscribers
-- | each subscriber is identified by its contact data "hostname" "port" -- | each subscriber is identified by its contact data "hostname" "port"
-- and holds a TChan duplicated from the broadcast TChan of the tag -- and holds a TChan duplicated from the broadcast TChan of the tag
type TagSubscribers = TVar (HMap.HashMap (String, Int) (TChan PostID)) -- + an expiration timestamp
type TagSubscribers = (HMap.HashMap (String, Int) (TChan PostID, POSIXTime))
instance DHT d => Service PostService d where instance DHT d => Service PostService d where
-- | initialise 'PostService' data structures and run server -- | initialise 'PostService' data structures and run server
runService dht host port = do runService conf dht = do
-- create necessary TVars -- create necessary TVars
threadVar <- newTVarIO =<< myThreadId -- own thread ID as placeholder threadVar <- newTVarIO =<< myThreadId -- own thread ID as placeholder
subscriberVar <- newTVarIO emptyRMap subscriberVar <- newTVarIO emptyRMap
@ -73,8 +73,7 @@ instance DHT d => Service PostService d where
relayInQueue' <- newTQueueIO relayInQueue' <- newTQueueIO
let let
thisService = PostService { thisService = PostService {
psPort = port' serviceConf = conf
, psHost = host
, baseDHT = dht , baseDHT = dht
, serviceThread = threadVar , serviceThread = threadVar
, subscribers = subscriberVar , subscribers = subscriberVar
@ -82,8 +81,8 @@ instance DHT d => Service PostService d where
, ownPosts = ownPostVar , ownPosts = ownPostVar
, relayInQueue = relayInQueue' , relayInQueue = relayInQueue'
} }
port' = fromIntegral port port' = fromIntegral (confServicePort conf)
warpSettings = Warp.setPort port' . Warp.setHost (fromString host) $ Warp.defaultSettings warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings
-- Run 'concurrently_' from another thread to be able to return the -- Run 'concurrently_' from another thread to be able to return the
-- 'PostService'. -- 'PostService'.
-- Terminating that parent thread will make all child threads terminate as well. -- Terminating that parent thread will make all child threads terminate as well.
@ -96,7 +95,7 @@ instance DHT d => Service PostService d where
atomically $ writeTVar threadVar servThreadID atomically $ writeTVar threadVar servThreadID
pure thisService pure thisService
getServicePort s = fromIntegral $ psPort s getServicePort' = fromIntegral . confServicePort . serviceConf
-- | return a WAI application -- | return a WAI application
@ -115,7 +114,7 @@ placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB
-- ========= HTTP API and handlers ============= -- ========= HTTP API and handlers =============
type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
-- ^ delivery endpoint of newly published posts of the relay's instance -- ^ delivery endpoint of newly published posts of the relay's instance
:<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] NoContent :<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] NoContent
-- ^ endpoint for delivering the subscriptions and outstanding queue -- ^ endpoint for delivering the subscriptions and outstanding queue
@ -123,6 +122,8 @@ type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> Put
-- ^ fetch endpoint for posts, full post ID is http://$domain/post/$postid -- ^ fetch endpoint for posts, full post ID is http://$domain/post/$postid
:<|> "posts" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text :<|> "posts" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text
-- ^ endpoint for fetching multiple posts at once -- ^ endpoint for fetching multiple posts at once
:<|> "posts" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
-- ^ delivery endpoint of newly published posts of the relay's instance
:<|> "tags" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text :<|> "tags" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text
-- ^ delivery endpoint for posts of $tag at subscribing instance -- ^ delivery endpoint for posts of $tag at subscribing instance
:<|> "tags" :> Capture "hashtag" Txt.Text :> "subscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Integer :<|> "tags" :> Capture "hashtag" Txt.Text :> "subscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Integer
@ -139,28 +140,28 @@ postServer service = relayInbox service
:<|> subscriptionDelivery service :<|> subscriptionDelivery service
:<|> postFetch service :<|> postFetch service
:<|> postMultiFetch service :<|> postMultiFetch service
:<|> postInbox service
:<|> tagDelivery service :<|> tagDelivery service
:<|> tagSubscribe service :<|> tagSubscribe service
:<|> tagUnsubscribe service :<|> tagUnsubscribe service
relayInbox :: PostService d -> Txt.Text -> Handler NoContent relayInbox :: PostService d -> Hashtag -> Txt.Text -> Handler NoContent
relayInbox serv post = do relayInbox serv tag posts = do
-- extract contained hashtags
let let
containedTags = fmap (normaliseTag . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post -- skip checking whether the post actually contains the tag, just drop full post
-- generate post ID postIDs = head . Txt.splitOn "," <$> Txt.lines posts
postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^128-1) :: IO Integer) broadcastChan <- liftIO $ atomically $ getTagBroadcastChannel serv tag
-- add ID to own posts -- if tag is not in own responsibility, return a 410 Gone
liftIO . atomically $ modifyTVar' (ownPosts serv) (HSet.insert postId) maybe
-- enqueue a relay job for each tag (throwError $ err410 { errBody = "Relay is not responsible for this tag"})
liftIO $ forM_ (containedTags :: [Txt.Text]) (\tag -> -- otherwise enqueue posts into broadcast queue of the tag
atomically $ writeTQueue (relayInQueue serv) (tag, postId, post) (\queue ->
liftIO $ forM_ postIDs (atomically . writeTChan queue)
) )
broadcastChan
pure NoContent pure NoContent
subscriptionDelivery :: PostService d -> Txt.Text -> Handler NoContent subscriptionDelivery :: PostService d -> Txt.Text -> Handler NoContent
subscriptionDelivery serv subList = do subscriptionDelivery serv subList = do
let let
@ -172,10 +173,12 @@ subscriptionDelivery serv subList = do
processTag :: TVar RelayTags -> Txt.Text -> IO () processTag :: TVar RelayTags -> Txt.Text -> IO ()
processTag subscriberSTM tagData = do processTag subscriberSTM tagData = do
let let
tag:subText:posts:_ = Txt.splitOn "," tagData tag:subText:lease:posts:_ = Txt.splitOn "," tagData
-- ignore checking of lease time
leaseTime = fromIntegral (read . Txt.unpack $ lease :: Integer)
sub = read . Txt.unpack $ subText :: (String, Int) sub = read . Txt.unpack $ subText :: (String, Int)
postList = Txt.words posts postList = Txt.words posts
enqueueSubscriptions subscriberSTM (normaliseTag tag) sub postList enqueueSubscription subscriberSTM (normaliseTag tag) sub postList leaseTime
postFetch :: PostService d -> Txt.Text -> Handler Txt.Text postFetch :: PostService d -> Txt.Text -> Handler Txt.Text
@ -186,9 +189,34 @@ postFetch serv postID = do
then pure placeholderPost then pure placeholderPost
else throwError $ err404 { errBody = "No post found with this ID" } else throwError $ err404 { errBody = "No post found with this ID" }
postMultiFetch :: PostService d -> Txt.Text -> Handler Txt.Text postMultiFetch :: PostService d -> Txt.Text -> Handler Txt.Text
postMultiFetch serv postIDs = pure $ "Here be multiple post dragons: " postMultiFetch serv postIDs = do
<> (Txt.unwords . Txt.lines $ postIDs) let idList = Txt.lines postIDs
postSet <- liftIO . readTVarIO . ownPosts $ serv
-- look up existence of all given post IDs, fail if even one is missing
foldM (\response postID ->
if HSet.member postID postSet
then pure $ placeholderPost <> "\n" <> response
else throwError $ err404 { errBody = "No post found with this ID" }
) "" idList
postInbox :: PostService d -> Txt.Text -> Handler NoContent
postInbox serv post = do
-- extract contained hashtags
let
containedTags = fmap (normaliseTag . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post
-- generate post ID
postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^(128::Integer)-1) :: IO Integer)
-- add ID to own posts
liftIO . atomically $ modifyTVar' (ownPosts serv) (HSet.insert postId)
-- enqueue a relay job for each tag
liftIO $ forM_ (containedTags :: [Txt.Text]) (\tag ->
atomically $ writeTQueue (relayInQueue serv) (tag, postId, post)
)
pure NoContent
tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text
tagDelivery serv hashtag posts = pure $ "Here be #" <> hashtag <> " dragons with " <> posts tagDelivery serv hashtag posts = pure $ "Here be #" <> hashtag <> " dragons with " <> posts
@ -204,26 +232,30 @@ tagUnsubscribe serv hashtag origin = pure $ "Here be a dragon unsubscription fro
-- | Write all pending posts of a subscriber-tag-combination to its queue. -- | Write all pending posts of a subscriber-tag-combination to its queue.
-- Sets up all necessary data structures if they are still missing. -- Sets up all necessary data structures if they are still missing.
enqueueSubscriptions :: TVar RelayTags -- tag-subscriber map enqueueSubscription :: TVar RelayTags -- tag-subscriber map
-> Hashtag -- hashtag of pending posts -> Hashtag -- hashtag of pending posts
-> (String, Int) -- subscriber's connection information -> (String, Int) -- subscriber's connection information
-> [PostID] -- pending posts -> [PostID] -- pending posts
-> POSIXTime -- lease expiry time
-> IO () -> IO ()
enqueueSubscriptions tagMapSTM tag subscriber posts = do enqueueSubscription tagMapSTM tag subscriber posts leaseTime = do
-- get the tag output queue and, if necessary, create it -- get the tag output queue and, if necessary, create it
subChan <- atomically setupSubscriberChannel subChan <- atomically $ setupSubscriberChannel tagMapSTM tag subscriber leaseTime
forM_ posts (atomically . writeTChan subChan) forM_ posts (atomically . writeTChan subChan)
where
setupSubscriberChannel :: STM (TChan PostID)
setupSubscriberChannel = do -- | STM operation to return the outgoing post queue of a tag to a specified subscriber.
-- If the queue doesn't exist yet, all necessary data structures are set up accordingly.
setupSubscriberChannel :: TVar RelayTags -> Hashtag -> (String, Int) -> POSIXTime -> STM (TChan PostID)
setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do
tagMap <- readTVar tagMapSTM tagMap <- readTVar tagMapSTM
case rMapLookup (genKeyID . Txt.unpack $ tag) tagMap of case lookupRelayTags tag tagMap of
Nothing -> do Nothing -> do
-- if no collision/ tag doesn't exist yet, just initialize a -- if no collision/ tag doesn't exist yet, just initialize a
-- new subscriber map -- new subscriber map
broadcastChan <- newBroadcastTChan broadcastChan <- newBroadcastTChan
tagOutChan <- dupTChan broadcastChan tagOutChan <- dupTChan broadcastChan
newSubMapSTM <- newTVar $ HMap.singleton subscriber tagOutChan newSubMapSTM <- newTVar $ HMap.singleton subscriber (tagOutChan, leaseTime)
writeTVar tagMapSTM $ addRMapEntry (genKeyID . Txt.unpack $ tag) (newSubMapSTM, broadcastChan, tag) tagMap writeTVar tagMapSTM $ addRMapEntry (genKeyID . Txt.unpack $ tag) (newSubMapSTM, broadcastChan, tag) tagMap
pure tagOutChan pure tagOutChan
Just (foundSubMapSTM, broadcastChan, _) -> do Just (foundSubMapSTM, broadcastChan, _) -> do
@ -233,10 +265,28 @@ enqueueSubscriptions tagMapSTM tag subscriber posts = do
Nothing -> do Nothing -> do
-- for new subscribers, create new output channel -- for new subscribers, create new output channel
tagOutChan <- dupTChan broadcastChan tagOutChan <- dupTChan broadcastChan
writeTVar foundSubMapSTM $ HMap.insert subscriber tagOutChan foundSubMap writeTVar foundSubMapSTM $ HMap.insert subscriber (tagOutChan, leaseTime) foundSubMap
pure tagOutChan pure tagOutChan
-- existing subscriber's channels are just returned -- existing subscriber's channels are just returned
Just tagOutChan -> pure tagOutChan Just (tagOutChan, _) -> pure tagOutChan
-- | returns the broadcast channel of a hashtag if there are any subscribers to it
getTagBroadcastChannel :: PostService d -> Hashtag -> STM (Maybe (TChan PostID))
getTagBroadcastChannel serv tag = do
tagMap <- readTVar $ subscribers serv
case lookupRelayTags tag tagMap of
Nothing -> pure Nothing
Just (subscriberSTM, broadcastChan, _) -> do
subscriberMap <- readTVar subscriberSTM
if HMap.null subscriberMap
then pure Nothing
else pure (Just broadcastChan)
-- | look up the subscription data of a tag
lookupRelayTags :: Hashtag -> RelayTags -> Maybe (TagSubscribersSTM, TChan PostID, Hashtag)
lookupRelayTags tag = rMapLookup (genKeyID . Txt.unpack $ tag)
-- normalise the unicode representation of a string to NFC -- normalise the unicode representation of a string to NFC

View file

@ -1,15 +0,0 @@
{-# LANGUAGE MultiParamTypeClasses #-}
module Hash2Pub.ServiceTypes where
import Data.Hashable (Hashable (..))
import Hash2Pub.FediChord (DHT (..), NodeID (..))
class Service s d where
-- | run the service
runService :: (Integral i) => d -> String -> i -> IO (s d)
getServicePort :: (Integral i) => s d -> i
instance Hashable NodeID where
hashWithSalt salt = hashWithSalt salt . getNodeID
hash = hash . getNodeID