Compare commits

...

5 commits

6 changed files with 158 additions and 81 deletions

View file

@ -55,7 +55,7 @@ library
import: deps
-- Modules exported by the library.
exposed-modules: Hash2Pub.FediChord, Hash2Pub.FediChordTypes, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes, Hash2Pub.PostService, Hash2Pub.ServiceTypes, Hash2Pub.RingMap
exposed-modules: Hash2Pub.FediChord, Hash2Pub.FediChordTypes, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes, Hash2Pub.PostService, Hash2Pub.RingMap
-- Modules included in this library but not exported.
other-modules: Hash2Pub.Utils

View file

@ -10,15 +10,17 @@ import Data.IP (IPv6, toHostAddress6)
import System.Environment
import Hash2Pub.FediChord
import Hash2Pub.FediChordTypes
import Hash2Pub.PostService (PostService (..))
main :: IO ()
main = do
-- ToDo: parse and pass config
-- probably use `tomland` for that
conf <- readConfig
(fConf, sConf) <- readConfig
-- TODO: first initialise 'RealNode', then the vservers
-- ToDo: load persisted caches, bootstrapping nodes …
(serverSock, thisNode) <- fediChordInit conf
(serverSock, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d))
-- currently no masking is necessary, as there is nothing to clean up
nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode
-- try joining the DHT using one of the provided bootstrapping nodes
@ -41,10 +43,11 @@ main = do
pure ()
readConfig :: IO FediChordConf
readConfig :: IO (FediChordConf, ServiceConf)
readConfig = do
confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : _ <- getArgs
pure $ FediChordConf {
confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : servicePortString : speedup : _ <- getArgs
let
fConf = FediChordConf {
confDomain = confDomainString
, confIP = toHostAddress6 . read $ ipString
, confDhtPort = read portString
@ -53,3 +56,10 @@ readConfig = do
, confBootstrapSamplingInterval = 180
, confMaxLookupCacheAge = 300
}
sConf = ServiceConf {
confSubscriptionExpiryTime = 2*3600 `div` read speedup
, confServicePort = read servicePortString
, confServiceHost = confDomainString
}
pure (fConf, sConf)

View file

@ -95,16 +95,23 @@ import Debug.Trace (trace)
-- | initialise data structures, compute own IDs and bind to listening socket
-- ToDo: load persisted state, thus this function already operates in IO
fediChordInit :: FediChordConf -> IO (Socket, LocalNodeStateSTM)
fediChordInit initConf = do
fediChordInit :: (Service s RealNodeSTM)
=> FediChordConf
-> (RealNodeSTM -> IO (s RealNodeSTM)) -- ^ runner function for service
-> IO (Socket, LocalNodeStateSTM)
fediChordInit initConf serviceRunner = do
emptyLookupCache <- newTVarIO Map.empty
let realNode = RealNode {
vservers = []
, nodeConfig = initConf
, bootstrapNodes = confBootstrapNodes initConf
, lookupCacheSTM = emptyLookupCache
--, service = undefined
}
realNodeSTM <- newTVarIO realNode
-- launch service and set the reference in the RealNode
serv <- serviceRunner realNodeSTM
--atomically . writeTVar $ realNode { service = serv }
initialState <- nodeStateInit realNodeSTM
initialStateSTM <- newTVarIO initialState
serverSock <- mkServerSocket (getIpAddr initialState) (getDhtPort initialState)

View file

@ -58,11 +58,14 @@ module Hash2Pub.FediChordTypes (
, bsAsIpAddr
, FediChordConf(..)
, DHT(..)
, Service(..)
, ServiceConf(..)
) where
import Control.Exception
import Data.Foldable (foldr')
import Data.Function (on)
import qualified Data.Hashable as Hashable
import Data.List (delete, nub, sortBy)
import qualified Data.Map.Strict as Map
import Data.Maybe (fromJust, fromMaybe, isJust,
@ -144,6 +147,7 @@ a `localCompare` b
-- | Data for managing the virtual server nodes of this real node.
-- Also contains shared data and config values.
-- TODO: more data structures for k-choices bookkeeping
--data RealNode s = RealNode
data RealNode = RealNode
{ vservers :: [LocalNodeStateSTM]
-- ^ references to all active versers
@ -155,6 +159,7 @@ data RealNode = RealNode
-- ^ a global cache of looked up keys and their associated nodes
}
--type RealNodeSTM s = TVar (RealNode s)
type RealNodeSTM = TVar RealNode
-- | represents a node and all its important state
@ -411,6 +416,26 @@ data FediChordConf = FediChordConf
}
deriving (Show, Eq)
-- ====== Service Types ============
class Service s d where
-- | run the service
runService :: ServiceConf -> d -> IO (s d)
getServicePort' :: (Integral i) => s d -> i
instance Hashable.Hashable NodeID where
hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID
hash = Hashable.hash . getNodeID
data ServiceConf = ServiceConf
{ confSubscriptionExpiryTime :: Integer
-- ^ subscription lease expiration in seconds
, confServicePort :: Int
-- ^ listening port for service
, confServiceHost :: String
-- ^ hostname of service
}
class DHT d where
-- | lookup the responsible host handling a given key string,
-- possiblggy from a lookup cache

View file

@ -14,7 +14,7 @@ import Control.Concurrent.STM
import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar
import Control.Monad (forM_, forever)
import Control.Monad (foldM, forM_, forever)
import Control.Monad.IO.Class (liftIO)
import qualified Data.ByteString.Lazy.UTF8 as BSU
import qualified Data.HashMap.Strict as HMap
@ -32,12 +32,10 @@ import Servant
import Hash2Pub.FediChordTypes
import Hash2Pub.RingMap
import Hash2Pub.ServiceTypes
data PostService d = PostService
{ psPort :: Warp.Port
, psHost :: String
{ serviceConf :: ServiceConf
-- queues, other data structures
, baseDHT :: (DHT d) => d
, serviceThread :: TVar ThreadId
@ -56,15 +54,17 @@ type PostID = Txt.Text
type PostContent = Txt.Text
-- | For each handled tag, store its subscribers and provide a
-- broadcast 'TChan' for enqueuing posts
type RelayTags = RingMap NodeID (TagSubscribers, TChan PostID, Hashtag)
type RelayTags = RingMap NodeID (TagSubscribersSTM, TChan PostID, Hashtag)
type TagSubscribersSTM = TVar TagSubscribers
-- | each subscriber is identified by its contact data "hostname" "port"
-- and holds a TChan duplicated from the broadcast TChan of the tag
type TagSubscribers = TVar (HMap.HashMap (String, Int) (TChan PostID))
-- + an expiration timestamp
type TagSubscribers = (HMap.HashMap (String, Int) (TChan PostID, POSIXTime))
instance DHT d => Service PostService d where
-- | initialise 'PostService' data structures and run server
runService dht host port = do
runService conf dht = do
-- create necessary TVars
threadVar <- newTVarIO =<< myThreadId -- own thread ID as placeholder
subscriberVar <- newTVarIO emptyRMap
@ -73,8 +73,7 @@ instance DHT d => Service PostService d where
relayInQueue' <- newTQueueIO
let
thisService = PostService {
psPort = port'
, psHost = host
serviceConf = conf
, baseDHT = dht
, serviceThread = threadVar
, subscribers = subscriberVar
@ -82,8 +81,8 @@ instance DHT d => Service PostService d where
, ownPosts = ownPostVar
, relayInQueue = relayInQueue'
}
port' = fromIntegral port
warpSettings = Warp.setPort port' . Warp.setHost (fromString host) $ Warp.defaultSettings
port' = fromIntegral (confServicePort conf)
warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings
-- Run 'concurrently_' from another thread to be able to return the
-- 'PostService'.
-- Terminating that parent thread will make all child threads terminate as well.
@ -96,7 +95,7 @@ instance DHT d => Service PostService d where
atomically $ writeTVar threadVar servThreadID
pure thisService
getServicePort s = fromIntegral $ psPort s
getServicePort' = fromIntegral . confServicePort . serviceConf
-- | return a WAI application
@ -115,7 +114,7 @@ placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB
-- ========= HTTP API and handlers =============
type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
-- ^ delivery endpoint of newly published posts of the relay's instance
:<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] NoContent
-- ^ endpoint for delivering the subscriptions and outstanding queue
@ -123,6 +122,8 @@ type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> Put
-- ^ fetch endpoint for posts, full post ID is http://$domain/post/$postid
:<|> "posts" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text
-- ^ endpoint for fetching multiple posts at once
:<|> "posts" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
-- ^ delivery endpoint of newly published posts of the relay's instance
:<|> "tags" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text
-- ^ delivery endpoint for posts of $tag at subscribing instance
:<|> "tags" :> Capture "hashtag" Txt.Text :> "subscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Integer
@ -139,28 +140,28 @@ postServer service = relayInbox service
:<|> subscriptionDelivery service
:<|> postFetch service
:<|> postMultiFetch service
:<|> postInbox service
:<|> tagDelivery service
:<|> tagSubscribe service
:<|> tagUnsubscribe service
relayInbox :: PostService d -> Txt.Text -> Handler NoContent
relayInbox serv post = do
-- extract contained hashtags
relayInbox :: PostService d -> Hashtag -> Txt.Text -> Handler NoContent
relayInbox serv tag posts = do
let
containedTags = fmap (normaliseTag . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post
-- generate post ID
postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^128-1) :: IO Integer)
-- add ID to own posts
liftIO . atomically $ modifyTVar' (ownPosts serv) (HSet.insert postId)
-- enqueue a relay job for each tag
liftIO $ forM_ (containedTags :: [Txt.Text]) (\tag ->
atomically $ writeTQueue (relayInQueue serv) (tag, postId, post)
)
-- skip checking whether the post actually contains the tag, just drop full post
postIDs = head . Txt.splitOn "," <$> Txt.lines posts
broadcastChan <- liftIO $ atomically $ getTagBroadcastChannel serv tag
-- if tag is not in own responsibility, return a 410 Gone
maybe
(throwError $ err410 { errBody = "Relay is not responsible for this tag"})
-- otherwise enqueue posts into broadcast queue of the tag
(\queue ->
liftIO $ forM_ postIDs (atomically . writeTChan queue)
)
broadcastChan
pure NoContent
subscriptionDelivery :: PostService d -> Txt.Text -> Handler NoContent
subscriptionDelivery serv subList = do
let
@ -172,10 +173,12 @@ subscriptionDelivery serv subList = do
processTag :: TVar RelayTags -> Txt.Text -> IO ()
processTag subscriberSTM tagData = do
let
tag:subText:posts:_ = Txt.splitOn "," tagData
tag:subText:lease:posts:_ = Txt.splitOn "," tagData
-- ignore checking of lease time
leaseTime = fromIntegral (read . Txt.unpack $ lease :: Integer)
sub = read . Txt.unpack $ subText :: (String, Int)
postList = Txt.words posts
enqueueSubscriptions subscriberSTM (normaliseTag tag) sub postList
enqueueSubscription subscriberSTM (normaliseTag tag) sub postList leaseTime
postFetch :: PostService d -> Txt.Text -> Handler Txt.Text
@ -186,9 +189,34 @@ postFetch serv postID = do
then pure placeholderPost
else throwError $ err404 { errBody = "No post found with this ID" }
postMultiFetch :: PostService d -> Txt.Text -> Handler Txt.Text
postMultiFetch serv postIDs = pure $ "Here be multiple post dragons: "
<> (Txt.unwords . Txt.lines $ postIDs)
postMultiFetch serv postIDs = do
let idList = Txt.lines postIDs
postSet <- liftIO . readTVarIO . ownPosts $ serv
-- look up existence of all given post IDs, fail if even one is missing
foldM (\response postID ->
if HSet.member postID postSet
then pure $ placeholderPost <> "\n" <> response
else throwError $ err404 { errBody = "No post found with this ID" }
) "" idList
postInbox :: PostService d -> Txt.Text -> Handler NoContent
postInbox serv post = do
-- extract contained hashtags
let
containedTags = fmap (normaliseTag . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post
-- generate post ID
postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^(128::Integer)-1) :: IO Integer)
-- add ID to own posts
liftIO . atomically $ modifyTVar' (ownPosts serv) (HSet.insert postId)
-- enqueue a relay job for each tag
liftIO $ forM_ (containedTags :: [Txt.Text]) (\tag ->
atomically $ writeTQueue (relayInQueue serv) (tag, postId, post)
)
pure NoContent
tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text
tagDelivery serv hashtag posts = pure $ "Here be #" <> hashtag <> " dragons with " <> posts
@ -204,39 +232,61 @@ tagUnsubscribe serv hashtag origin = pure $ "Here be a dragon unsubscription fro
-- | Write all pending posts of a subscriber-tag-combination to its queue.
-- Sets up all necessary data structures if they are still missing.
enqueueSubscriptions :: TVar RelayTags -- tag-subscriber map
enqueueSubscription :: TVar RelayTags -- tag-subscriber map
-> Hashtag -- hashtag of pending posts
-> (String, Int) -- subscriber's connection information
-> [PostID] -- pending posts
-> POSIXTime -- lease expiry time
-> IO ()
enqueueSubscriptions tagMapSTM tag subscriber posts = do
enqueueSubscription tagMapSTM tag subscriber posts leaseTime = do
-- get the tag output queue and, if necessary, create it
subChan <- atomically setupSubscriberChannel
subChan <- atomically $ setupSubscriberChannel tagMapSTM tag subscriber leaseTime
forM_ posts (atomically . writeTChan subChan)
where
setupSubscriberChannel :: STM (TChan PostID)
setupSubscriberChannel = do
tagMap <- readTVar tagMapSTM
case rMapLookup (genKeyID . Txt.unpack $ tag) tagMap of
Nothing -> do
-- if no collision/ tag doesn't exist yet, just initialize a
-- new subscriber map
broadcastChan <- newBroadcastTChan
tagOutChan <- dupTChan broadcastChan
newSubMapSTM <- newTVar $ HMap.singleton subscriber tagOutChan
writeTVar tagMapSTM $ addRMapEntry (genKeyID . Txt.unpack $ tag) (newSubMapSTM, broadcastChan, tag) tagMap
pure tagOutChan
Just (foundSubMapSTM, broadcastChan, _) -> do
-- otherwise use the existing subscriber map
foundSubMap <- readTVar foundSubMapSTM
case HMap.lookup subscriber foundSubMap of
Nothing -> do
-- for new subscribers, create new output channel
tagOutChan <- dupTChan broadcastChan
writeTVar foundSubMapSTM $ HMap.insert subscriber tagOutChan foundSubMap
pure tagOutChan
-- existing subscriber's channels are just returned
Just tagOutChan -> pure tagOutChan
-- | STM operation to return the outgoing post queue of a tag to a specified subscriber.
-- If the queue doesn't exist yet, all necessary data structures are set up accordingly.
setupSubscriberChannel :: TVar RelayTags -> Hashtag -> (String, Int) -> POSIXTime -> STM (TChan PostID)
setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do
tagMap <- readTVar tagMapSTM
case lookupRelayTags tag tagMap of
Nothing -> do
-- if no collision/ tag doesn't exist yet, just initialize a
-- new subscriber map
broadcastChan <- newBroadcastTChan
tagOutChan <- dupTChan broadcastChan
newSubMapSTM <- newTVar $ HMap.singleton subscriber (tagOutChan, leaseTime)
writeTVar tagMapSTM $ addRMapEntry (genKeyID . Txt.unpack $ tag) (newSubMapSTM, broadcastChan, tag) tagMap
pure tagOutChan
Just (foundSubMapSTM, broadcastChan, _) -> do
-- otherwise use the existing subscriber map
foundSubMap <- readTVar foundSubMapSTM
case HMap.lookup subscriber foundSubMap of
Nothing -> do
-- for new subscribers, create new output channel
tagOutChan <- dupTChan broadcastChan
writeTVar foundSubMapSTM $ HMap.insert subscriber (tagOutChan, leaseTime) foundSubMap
pure tagOutChan
-- existing subscriber's channels are just returned
Just (tagOutChan, _) -> pure tagOutChan
-- | returns the broadcast channel of a hashtag if there are any subscribers to it
getTagBroadcastChannel :: PostService d -> Hashtag -> STM (Maybe (TChan PostID))
getTagBroadcastChannel serv tag = do
tagMap <- readTVar $ subscribers serv
case lookupRelayTags tag tagMap of
Nothing -> pure Nothing
Just (subscriberSTM, broadcastChan, _) -> do
subscriberMap <- readTVar subscriberSTM
if HMap.null subscriberMap
then pure Nothing
else pure (Just broadcastChan)
-- | look up the subscription data of a tag
lookupRelayTags :: Hashtag -> RelayTags -> Maybe (TagSubscribersSTM, TChan PostID, Hashtag)
lookupRelayTags tag = rMapLookup (genKeyID . Txt.unpack $ tag)
-- normalise the unicode representation of a string to NFC

View file

@ -1,15 +0,0 @@
{-# LANGUAGE MultiParamTypeClasses #-}
module Hash2Pub.ServiceTypes where
import Data.Hashable (Hashable (..))
import Hash2Pub.FediChord (DHT (..), NodeID (..))
class Service s d where
-- | run the service
runService :: (Integral i) => d -> String -> i -> IO (s d)
getServicePort :: (Integral i) => s d -> i
instance Hashable NodeID where
hashWithSalt salt = hashWithSalt salt . getNodeID
hash = hash . getNodeID