Compare commits
No commits in common. "4bf80911432da8db7b1d5bc9278885310c1517c7" and "63bc06a88e3e6827c92a4ea51f4ec447de000dc5" have entirely different histories.
4bf8091143
...
63bc06a88e
|
@ -55,7 +55,7 @@ library
|
||||||
import: deps
|
import: deps
|
||||||
|
|
||||||
-- Modules exported by the library.
|
-- Modules exported by the library.
|
||||||
exposed-modules: Hash2Pub.FediChord, Hash2Pub.FediChordTypes, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes, Hash2Pub.PostService, Hash2Pub.RingMap
|
exposed-modules: Hash2Pub.FediChord, Hash2Pub.FediChordTypes, Hash2Pub.DHTProtocol, Hash2Pub.ASN1Coding, Hash2Pub.ProtocolTypes, Hash2Pub.PostService, Hash2Pub.ServiceTypes, Hash2Pub.RingMap
|
||||||
|
|
||||||
-- Modules included in this library but not exported.
|
-- Modules included in this library but not exported.
|
||||||
other-modules: Hash2Pub.Utils
|
other-modules: Hash2Pub.Utils
|
||||||
|
|
20
app/Main.hs
20
app/Main.hs
|
@ -10,17 +10,15 @@ import Data.IP (IPv6, toHostAddress6)
|
||||||
import System.Environment
|
import System.Environment
|
||||||
|
|
||||||
import Hash2Pub.FediChord
|
import Hash2Pub.FediChord
|
||||||
import Hash2Pub.FediChordTypes
|
|
||||||
import Hash2Pub.PostService (PostService (..))
|
|
||||||
|
|
||||||
main :: IO ()
|
main :: IO ()
|
||||||
main = do
|
main = do
|
||||||
-- ToDo: parse and pass config
|
-- ToDo: parse and pass config
|
||||||
-- probably use `tomland` for that
|
-- probably use `tomland` for that
|
||||||
(fConf, sConf) <- readConfig
|
conf <- readConfig
|
||||||
-- TODO: first initialise 'RealNode', then the vservers
|
-- TODO: first initialise 'RealNode', then the vservers
|
||||||
-- ToDo: load persisted caches, bootstrapping nodes …
|
-- ToDo: load persisted caches, bootstrapping nodes …
|
||||||
(serverSock, thisNode) <- fediChordInit fConf (runService sConf :: DHT d => d -> IO (PostService d))
|
(serverSock, thisNode) <- fediChordInit conf
|
||||||
-- currently no masking is necessary, as there is nothing to clean up
|
-- currently no masking is necessary, as there is nothing to clean up
|
||||||
nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode
|
nodeCacheWriterThread <- forkIO $ nodeCacheWriter thisNode
|
||||||
-- try joining the DHT using one of the provided bootstrapping nodes
|
-- try joining the DHT using one of the provided bootstrapping nodes
|
||||||
|
@ -43,11 +41,10 @@ main = do
|
||||||
pure ()
|
pure ()
|
||||||
|
|
||||||
|
|
||||||
readConfig :: IO (FediChordConf, ServiceConf)
|
readConfig :: IO FediChordConf
|
||||||
readConfig = do
|
readConfig = do
|
||||||
confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : servicePortString : speedup : _ <- getArgs
|
confDomainString : ipString : portString : bootstrapHost : bootstrapPortString : _ <- getArgs
|
||||||
let
|
pure $ FediChordConf {
|
||||||
fConf = FediChordConf {
|
|
||||||
confDomain = confDomainString
|
confDomain = confDomainString
|
||||||
, confIP = toHostAddress6 . read $ ipString
|
, confIP = toHostAddress6 . read $ ipString
|
||||||
, confDhtPort = read portString
|
, confDhtPort = read portString
|
||||||
|
@ -56,10 +53,3 @@ readConfig = do
|
||||||
, confBootstrapSamplingInterval = 180
|
, confBootstrapSamplingInterval = 180
|
||||||
, confMaxLookupCacheAge = 300
|
, confMaxLookupCacheAge = 300
|
||||||
}
|
}
|
||||||
sConf = ServiceConf {
|
|
||||||
confSubscriptionExpiryTime = 2*3600 `div` read speedup
|
|
||||||
, confServicePort = read servicePortString
|
|
||||||
, confServiceHost = confDomainString
|
|
||||||
}
|
|
||||||
pure (fConf, sConf)
|
|
||||||
|
|
||||||
|
|
|
@ -95,23 +95,16 @@ import Debug.Trace (trace)
|
||||||
|
|
||||||
-- | initialise data structures, compute own IDs and bind to listening socket
|
-- | initialise data structures, compute own IDs and bind to listening socket
|
||||||
-- ToDo: load persisted state, thus this function already operates in IO
|
-- ToDo: load persisted state, thus this function already operates in IO
|
||||||
fediChordInit :: (Service s RealNodeSTM)
|
fediChordInit :: FediChordConf -> IO (Socket, LocalNodeStateSTM)
|
||||||
=> FediChordConf
|
fediChordInit initConf = do
|
||||||
-> (RealNodeSTM -> IO (s RealNodeSTM)) -- ^ runner function for service
|
|
||||||
-> IO (Socket, LocalNodeStateSTM)
|
|
||||||
fediChordInit initConf serviceRunner = do
|
|
||||||
emptyLookupCache <- newTVarIO Map.empty
|
emptyLookupCache <- newTVarIO Map.empty
|
||||||
let realNode = RealNode {
|
let realNode = RealNode {
|
||||||
vservers = []
|
vservers = []
|
||||||
, nodeConfig = initConf
|
, nodeConfig = initConf
|
||||||
, bootstrapNodes = confBootstrapNodes initConf
|
, bootstrapNodes = confBootstrapNodes initConf
|
||||||
, lookupCacheSTM = emptyLookupCache
|
, lookupCacheSTM = emptyLookupCache
|
||||||
--, service = undefined
|
|
||||||
}
|
}
|
||||||
realNodeSTM <- newTVarIO realNode
|
realNodeSTM <- newTVarIO realNode
|
||||||
-- launch service and set the reference in the RealNode
|
|
||||||
serv <- serviceRunner realNodeSTM
|
|
||||||
--atomically . writeTVar $ realNode { service = serv }
|
|
||||||
initialState <- nodeStateInit realNodeSTM
|
initialState <- nodeStateInit realNodeSTM
|
||||||
initialStateSTM <- newTVarIO initialState
|
initialStateSTM <- newTVarIO initialState
|
||||||
serverSock <- mkServerSocket (getIpAddr initialState) (getDhtPort initialState)
|
serverSock <- mkServerSocket (getIpAddr initialState) (getDhtPort initialState)
|
||||||
|
|
|
@ -58,14 +58,11 @@ module Hash2Pub.FediChordTypes (
|
||||||
, bsAsIpAddr
|
, bsAsIpAddr
|
||||||
, FediChordConf(..)
|
, FediChordConf(..)
|
||||||
, DHT(..)
|
, DHT(..)
|
||||||
, Service(..)
|
|
||||||
, ServiceConf(..)
|
|
||||||
) where
|
) where
|
||||||
|
|
||||||
import Control.Exception
|
import Control.Exception
|
||||||
import Data.Foldable (foldr')
|
import Data.Foldable (foldr')
|
||||||
import Data.Function (on)
|
import Data.Function (on)
|
||||||
import qualified Data.Hashable as Hashable
|
|
||||||
import Data.List (delete, nub, sortBy)
|
import Data.List (delete, nub, sortBy)
|
||||||
import qualified Data.Map.Strict as Map
|
import qualified Data.Map.Strict as Map
|
||||||
import Data.Maybe (fromJust, fromMaybe, isJust,
|
import Data.Maybe (fromJust, fromMaybe, isJust,
|
||||||
|
@ -147,7 +144,6 @@ a `localCompare` b
|
||||||
-- | Data for managing the virtual server nodes of this real node.
|
-- | Data for managing the virtual server nodes of this real node.
|
||||||
-- Also contains shared data and config values.
|
-- Also contains shared data and config values.
|
||||||
-- TODO: more data structures for k-choices bookkeeping
|
-- TODO: more data structures for k-choices bookkeeping
|
||||||
--data RealNode s = RealNode
|
|
||||||
data RealNode = RealNode
|
data RealNode = RealNode
|
||||||
{ vservers :: [LocalNodeStateSTM]
|
{ vservers :: [LocalNodeStateSTM]
|
||||||
-- ^ references to all active versers
|
-- ^ references to all active versers
|
||||||
|
@ -159,7 +155,6 @@ data RealNode = RealNode
|
||||||
-- ^ a global cache of looked up keys and their associated nodes
|
-- ^ a global cache of looked up keys and their associated nodes
|
||||||
}
|
}
|
||||||
|
|
||||||
--type RealNodeSTM s = TVar (RealNode s)
|
|
||||||
type RealNodeSTM = TVar RealNode
|
type RealNodeSTM = TVar RealNode
|
||||||
|
|
||||||
-- | represents a node and all its important state
|
-- | represents a node and all its important state
|
||||||
|
@ -416,26 +411,6 @@ data FediChordConf = FediChordConf
|
||||||
}
|
}
|
||||||
deriving (Show, Eq)
|
deriving (Show, Eq)
|
||||||
|
|
||||||
-- ====== Service Types ============
|
|
||||||
|
|
||||||
class Service s d where
|
|
||||||
-- | run the service
|
|
||||||
runService :: ServiceConf -> d -> IO (s d)
|
|
||||||
getServicePort' :: (Integral i) => s d -> i
|
|
||||||
|
|
||||||
instance Hashable.Hashable NodeID where
|
|
||||||
hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID
|
|
||||||
hash = Hashable.hash . getNodeID
|
|
||||||
|
|
||||||
data ServiceConf = ServiceConf
|
|
||||||
{ confSubscriptionExpiryTime :: Integer
|
|
||||||
-- ^ subscription lease expiration in seconds
|
|
||||||
, confServicePort :: Int
|
|
||||||
-- ^ listening port for service
|
|
||||||
, confServiceHost :: String
|
|
||||||
-- ^ hostname of service
|
|
||||||
}
|
|
||||||
|
|
||||||
class DHT d where
|
class DHT d where
|
||||||
-- | lookup the responsible host handling a given key string,
|
-- | lookup the responsible host handling a given key string,
|
||||||
-- possiblggy from a lookup cache
|
-- possiblggy from a lookup cache
|
||||||
|
|
|
@ -14,7 +14,7 @@ import Control.Concurrent.STM
|
||||||
import Control.Concurrent.STM.TChan
|
import Control.Concurrent.STM.TChan
|
||||||
import Control.Concurrent.STM.TQueue
|
import Control.Concurrent.STM.TQueue
|
||||||
import Control.Concurrent.STM.TVar
|
import Control.Concurrent.STM.TVar
|
||||||
import Control.Monad (foldM, forM_, forever)
|
import Control.Monad (forM_, forever)
|
||||||
import Control.Monad.IO.Class (liftIO)
|
import Control.Monad.IO.Class (liftIO)
|
||||||
import qualified Data.ByteString.Lazy.UTF8 as BSU
|
import qualified Data.ByteString.Lazy.UTF8 as BSU
|
||||||
import qualified Data.HashMap.Strict as HMap
|
import qualified Data.HashMap.Strict as HMap
|
||||||
|
@ -32,10 +32,12 @@ import Servant
|
||||||
|
|
||||||
import Hash2Pub.FediChordTypes
|
import Hash2Pub.FediChordTypes
|
||||||
import Hash2Pub.RingMap
|
import Hash2Pub.RingMap
|
||||||
|
import Hash2Pub.ServiceTypes
|
||||||
|
|
||||||
|
|
||||||
data PostService d = PostService
|
data PostService d = PostService
|
||||||
{ serviceConf :: ServiceConf
|
{ psPort :: Warp.Port
|
||||||
|
, psHost :: String
|
||||||
-- queues, other data structures
|
-- queues, other data structures
|
||||||
, baseDHT :: (DHT d) => d
|
, baseDHT :: (DHT d) => d
|
||||||
, serviceThread :: TVar ThreadId
|
, serviceThread :: TVar ThreadId
|
||||||
|
@ -54,17 +56,15 @@ type PostID = Txt.Text
|
||||||
type PostContent = Txt.Text
|
type PostContent = Txt.Text
|
||||||
-- | For each handled tag, store its subscribers and provide a
|
-- | For each handled tag, store its subscribers and provide a
|
||||||
-- broadcast 'TChan' for enqueuing posts
|
-- broadcast 'TChan' for enqueuing posts
|
||||||
type RelayTags = RingMap NodeID (TagSubscribersSTM, TChan PostID, Hashtag)
|
type RelayTags = RingMap NodeID (TagSubscribers, TChan PostID, Hashtag)
|
||||||
type TagSubscribersSTM = TVar TagSubscribers
|
|
||||||
-- | each subscriber is identified by its contact data "hostname" "port"
|
-- | each subscriber is identified by its contact data "hostname" "port"
|
||||||
-- and holds a TChan duplicated from the broadcast TChan of the tag
|
-- and holds a TChan duplicated from the broadcast TChan of the tag
|
||||||
-- + an expiration timestamp
|
type TagSubscribers = TVar (HMap.HashMap (String, Int) (TChan PostID))
|
||||||
type TagSubscribers = (HMap.HashMap (String, Int) (TChan PostID, POSIXTime))
|
|
||||||
|
|
||||||
|
|
||||||
instance DHT d => Service PostService d where
|
instance DHT d => Service PostService d where
|
||||||
-- | initialise 'PostService' data structures and run server
|
-- | initialise 'PostService' data structures and run server
|
||||||
runService conf dht = do
|
runService dht host port = do
|
||||||
-- create necessary TVars
|
-- create necessary TVars
|
||||||
threadVar <- newTVarIO =<< myThreadId -- own thread ID as placeholder
|
threadVar <- newTVarIO =<< myThreadId -- own thread ID as placeholder
|
||||||
subscriberVar <- newTVarIO emptyRMap
|
subscriberVar <- newTVarIO emptyRMap
|
||||||
|
@ -73,7 +73,8 @@ instance DHT d => Service PostService d where
|
||||||
relayInQueue' <- newTQueueIO
|
relayInQueue' <- newTQueueIO
|
||||||
let
|
let
|
||||||
thisService = PostService {
|
thisService = PostService {
|
||||||
serviceConf = conf
|
psPort = port'
|
||||||
|
, psHost = host
|
||||||
, baseDHT = dht
|
, baseDHT = dht
|
||||||
, serviceThread = threadVar
|
, serviceThread = threadVar
|
||||||
, subscribers = subscriberVar
|
, subscribers = subscriberVar
|
||||||
|
@ -81,8 +82,8 @@ instance DHT d => Service PostService d where
|
||||||
, ownPosts = ownPostVar
|
, ownPosts = ownPostVar
|
||||||
, relayInQueue = relayInQueue'
|
, relayInQueue = relayInQueue'
|
||||||
}
|
}
|
||||||
port' = fromIntegral (confServicePort conf)
|
port' = fromIntegral port
|
||||||
warpSettings = Warp.setPort port' . Warp.setHost (fromString . confServiceHost $ conf) $ Warp.defaultSettings
|
warpSettings = Warp.setPort port' . Warp.setHost (fromString host) $ Warp.defaultSettings
|
||||||
-- Run 'concurrently_' from another thread to be able to return the
|
-- Run 'concurrently_' from another thread to be able to return the
|
||||||
-- 'PostService'.
|
-- 'PostService'.
|
||||||
-- Terminating that parent thread will make all child threads terminate as well.
|
-- Terminating that parent thread will make all child threads terminate as well.
|
||||||
|
@ -95,7 +96,7 @@ instance DHT d => Service PostService d where
|
||||||
atomically $ writeTVar threadVar servThreadID
|
atomically $ writeTVar threadVar servThreadID
|
||||||
pure thisService
|
pure thisService
|
||||||
|
|
||||||
getServicePort' = fromIntegral . confServicePort . serviceConf
|
getServicePort s = fromIntegral $ psPort s
|
||||||
|
|
||||||
|
|
||||||
-- | return a WAI application
|
-- | return a WAI application
|
||||||
|
@ -114,7 +115,7 @@ placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB
|
||||||
|
|
||||||
-- ========= HTTP API and handlers =============
|
-- ========= HTTP API and handlers =============
|
||||||
|
|
||||||
type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
|
type PostServiceAPI = "relay" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
|
||||||
-- ^ delivery endpoint of newly published posts of the relay's instance
|
-- ^ delivery endpoint of newly published posts of the relay's instance
|
||||||
:<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] NoContent
|
:<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] NoContent
|
||||||
-- ^ endpoint for delivering the subscriptions and outstanding queue
|
-- ^ endpoint for delivering the subscriptions and outstanding queue
|
||||||
|
@ -122,8 +123,6 @@ type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBod
|
||||||
-- ^ fetch endpoint for posts, full post ID is http://$domain/post/$postid
|
-- ^ fetch endpoint for posts, full post ID is http://$domain/post/$postid
|
||||||
:<|> "posts" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text
|
:<|> "posts" :> ReqBody '[PlainText] Txt.Text :> Post '[PlainText] Txt.Text
|
||||||
-- ^ endpoint for fetching multiple posts at once
|
-- ^ endpoint for fetching multiple posts at once
|
||||||
:<|> "posts" :> "inbox" :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
|
|
||||||
-- ^ delivery endpoint of newly published posts of the relay's instance
|
|
||||||
:<|> "tags" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text
|
:<|> "tags" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PostCreated '[PlainText] Txt.Text
|
||||||
-- ^ delivery endpoint for posts of $tag at subscribing instance
|
-- ^ delivery endpoint for posts of $tag at subscribing instance
|
||||||
:<|> "tags" :> Capture "hashtag" Txt.Text :> "subscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Integer
|
:<|> "tags" :> Capture "hashtag" Txt.Text :> "subscribe" :> Header "Origin" Txt.Text :> Get '[PlainText] Integer
|
||||||
|
@ -140,28 +139,28 @@ postServer service = relayInbox service
|
||||||
:<|> subscriptionDelivery service
|
:<|> subscriptionDelivery service
|
||||||
:<|> postFetch service
|
:<|> postFetch service
|
||||||
:<|> postMultiFetch service
|
:<|> postMultiFetch service
|
||||||
:<|> postInbox service
|
|
||||||
:<|> tagDelivery service
|
:<|> tagDelivery service
|
||||||
:<|> tagSubscribe service
|
:<|> tagSubscribe service
|
||||||
:<|> tagUnsubscribe service
|
:<|> tagUnsubscribe service
|
||||||
|
|
||||||
|
|
||||||
relayInbox :: PostService d -> Hashtag -> Txt.Text -> Handler NoContent
|
relayInbox :: PostService d -> Txt.Text -> Handler NoContent
|
||||||
relayInbox serv tag posts = do
|
relayInbox serv post = do
|
||||||
|
-- extract contained hashtags
|
||||||
let
|
let
|
||||||
-- skip checking whether the post actually contains the tag, just drop full post
|
containedTags = fmap (normaliseTag . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post
|
||||||
postIDs = head . Txt.splitOn "," <$> Txt.lines posts
|
-- generate post ID
|
||||||
broadcastChan <- liftIO $ atomically $ getTagBroadcastChannel serv tag
|
postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^128-1) :: IO Integer)
|
||||||
-- if tag is not in own responsibility, return a 410 Gone
|
-- add ID to own posts
|
||||||
maybe
|
liftIO . atomically $ modifyTVar' (ownPosts serv) (HSet.insert postId)
|
||||||
(throwError $ err410 { errBody = "Relay is not responsible for this tag"})
|
-- enqueue a relay job for each tag
|
||||||
-- otherwise enqueue posts into broadcast queue of the tag
|
liftIO $ forM_ (containedTags :: [Txt.Text]) (\tag ->
|
||||||
(\queue ->
|
atomically $ writeTQueue (relayInQueue serv) (tag, postId, post)
|
||||||
liftIO $ forM_ postIDs (atomically . writeTChan queue)
|
)
|
||||||
)
|
|
||||||
broadcastChan
|
|
||||||
pure NoContent
|
pure NoContent
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
subscriptionDelivery :: PostService d -> Txt.Text -> Handler NoContent
|
subscriptionDelivery :: PostService d -> Txt.Text -> Handler NoContent
|
||||||
subscriptionDelivery serv subList = do
|
subscriptionDelivery serv subList = do
|
||||||
let
|
let
|
||||||
|
@ -173,12 +172,10 @@ subscriptionDelivery serv subList = do
|
||||||
processTag :: TVar RelayTags -> Txt.Text -> IO ()
|
processTag :: TVar RelayTags -> Txt.Text -> IO ()
|
||||||
processTag subscriberSTM tagData = do
|
processTag subscriberSTM tagData = do
|
||||||
let
|
let
|
||||||
tag:subText:lease:posts:_ = Txt.splitOn "," tagData
|
tag:subText:posts:_ = Txt.splitOn "," tagData
|
||||||
-- ignore checking of lease time
|
|
||||||
leaseTime = fromIntegral (read . Txt.unpack $ lease :: Integer)
|
|
||||||
sub = read . Txt.unpack $ subText :: (String, Int)
|
sub = read . Txt.unpack $ subText :: (String, Int)
|
||||||
postList = Txt.words posts
|
postList = Txt.words posts
|
||||||
enqueueSubscription subscriberSTM (normaliseTag tag) sub postList leaseTime
|
enqueueSubscriptions subscriberSTM (normaliseTag tag) sub postList
|
||||||
|
|
||||||
|
|
||||||
postFetch :: PostService d -> Txt.Text -> Handler Txt.Text
|
postFetch :: PostService d -> Txt.Text -> Handler Txt.Text
|
||||||
|
@ -189,34 +186,9 @@ postFetch serv postID = do
|
||||||
then pure placeholderPost
|
then pure placeholderPost
|
||||||
else throwError $ err404 { errBody = "No post found with this ID" }
|
else throwError $ err404 { errBody = "No post found with this ID" }
|
||||||
|
|
||||||
|
|
||||||
postMultiFetch :: PostService d -> Txt.Text -> Handler Txt.Text
|
postMultiFetch :: PostService d -> Txt.Text -> Handler Txt.Text
|
||||||
postMultiFetch serv postIDs = do
|
postMultiFetch serv postIDs = pure $ "Here be multiple post dragons: "
|
||||||
let idList = Txt.lines postIDs
|
<> (Txt.unwords . Txt.lines $ postIDs)
|
||||||
postSet <- liftIO . readTVarIO . ownPosts $ serv
|
|
||||||
-- look up existence of all given post IDs, fail if even one is missing
|
|
||||||
foldM (\response postID ->
|
|
||||||
if HSet.member postID postSet
|
|
||||||
then pure $ placeholderPost <> "\n" <> response
|
|
||||||
else throwError $ err404 { errBody = "No post found with this ID" }
|
|
||||||
) "" idList
|
|
||||||
|
|
||||||
|
|
||||||
postInbox :: PostService d -> Txt.Text -> Handler NoContent
|
|
||||||
postInbox serv post = do
|
|
||||||
-- extract contained hashtags
|
|
||||||
let
|
|
||||||
containedTags = fmap (normaliseTag . Txt.tail) . filter ((==) '#' . Txt.head) . Txt.words $ post
|
|
||||||
-- generate post ID
|
|
||||||
postId <- liftIO $ Txt.pack . show <$> (randomRIO (0, 2^(128::Integer)-1) :: IO Integer)
|
|
||||||
-- add ID to own posts
|
|
||||||
liftIO . atomically $ modifyTVar' (ownPosts serv) (HSet.insert postId)
|
|
||||||
-- enqueue a relay job for each tag
|
|
||||||
liftIO $ forM_ (containedTags :: [Txt.Text]) (\tag ->
|
|
||||||
atomically $ writeTQueue (relayInQueue serv) (tag, postId, post)
|
|
||||||
)
|
|
||||||
pure NoContent
|
|
||||||
|
|
||||||
|
|
||||||
tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text
|
tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text
|
||||||
tagDelivery serv hashtag posts = pure $ "Here be #" <> hashtag <> " dragons with " <> posts
|
tagDelivery serv hashtag posts = pure $ "Here be #" <> hashtag <> " dragons with " <> posts
|
||||||
|
@ -232,61 +204,39 @@ tagUnsubscribe serv hashtag origin = pure $ "Here be a dragon unsubscription fro
|
||||||
|
|
||||||
-- | Write all pending posts of a subscriber-tag-combination to its queue.
|
-- | Write all pending posts of a subscriber-tag-combination to its queue.
|
||||||
-- Sets up all necessary data structures if they are still missing.
|
-- Sets up all necessary data structures if they are still missing.
|
||||||
enqueueSubscription :: TVar RelayTags -- tag-subscriber map
|
enqueueSubscriptions :: TVar RelayTags -- tag-subscriber map
|
||||||
-> Hashtag -- hashtag of pending posts
|
-> Hashtag -- hashtag of pending posts
|
||||||
-> (String, Int) -- subscriber's connection information
|
-> (String, Int) -- subscriber's connection information
|
||||||
-> [PostID] -- pending posts
|
-> [PostID] -- pending posts
|
||||||
-> POSIXTime -- lease expiry time
|
|
||||||
-> IO ()
|
-> IO ()
|
||||||
enqueueSubscription tagMapSTM tag subscriber posts leaseTime = do
|
enqueueSubscriptions tagMapSTM tag subscriber posts = do
|
||||||
-- get the tag output queue and, if necessary, create it
|
-- get the tag output queue and, if necessary, create it
|
||||||
subChan <- atomically $ setupSubscriberChannel tagMapSTM tag subscriber leaseTime
|
subChan <- atomically setupSubscriberChannel
|
||||||
forM_ posts (atomically . writeTChan subChan)
|
forM_ posts (atomically . writeTChan subChan)
|
||||||
|
where
|
||||||
|
setupSubscriberChannel :: STM (TChan PostID)
|
||||||
-- | STM operation to return the outgoing post queue of a tag to a specified subscriber.
|
setupSubscriberChannel = do
|
||||||
-- If the queue doesn't exist yet, all necessary data structures are set up accordingly.
|
tagMap <- readTVar tagMapSTM
|
||||||
setupSubscriberChannel :: TVar RelayTags -> Hashtag -> (String, Int) -> POSIXTime -> STM (TChan PostID)
|
case rMapLookup (genKeyID . Txt.unpack $ tag) tagMap of
|
||||||
setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do
|
Nothing -> do
|
||||||
tagMap <- readTVar tagMapSTM
|
-- if no collision/ tag doesn't exist yet, just initialize a
|
||||||
case lookupRelayTags tag tagMap of
|
-- new subscriber map
|
||||||
Nothing -> do
|
broadcastChan <- newBroadcastTChan
|
||||||
-- if no collision/ tag doesn't exist yet, just initialize a
|
tagOutChan <- dupTChan broadcastChan
|
||||||
-- new subscriber map
|
newSubMapSTM <- newTVar $ HMap.singleton subscriber tagOutChan
|
||||||
broadcastChan <- newBroadcastTChan
|
writeTVar tagMapSTM $ addRMapEntry (genKeyID . Txt.unpack $ tag) (newSubMapSTM, broadcastChan, tag) tagMap
|
||||||
tagOutChan <- dupTChan broadcastChan
|
pure tagOutChan
|
||||||
newSubMapSTM <- newTVar $ HMap.singleton subscriber (tagOutChan, leaseTime)
|
Just (foundSubMapSTM, broadcastChan, _) -> do
|
||||||
writeTVar tagMapSTM $ addRMapEntry (genKeyID . Txt.unpack $ tag) (newSubMapSTM, broadcastChan, tag) tagMap
|
-- otherwise use the existing subscriber map
|
||||||
pure tagOutChan
|
foundSubMap <- readTVar foundSubMapSTM
|
||||||
Just (foundSubMapSTM, broadcastChan, _) -> do
|
case HMap.lookup subscriber foundSubMap of
|
||||||
-- otherwise use the existing subscriber map
|
Nothing -> do
|
||||||
foundSubMap <- readTVar foundSubMapSTM
|
-- for new subscribers, create new output channel
|
||||||
case HMap.lookup subscriber foundSubMap of
|
tagOutChan <- dupTChan broadcastChan
|
||||||
Nothing -> do
|
writeTVar foundSubMapSTM $ HMap.insert subscriber tagOutChan foundSubMap
|
||||||
-- for new subscribers, create new output channel
|
pure tagOutChan
|
||||||
tagOutChan <- dupTChan broadcastChan
|
-- existing subscriber's channels are just returned
|
||||||
writeTVar foundSubMapSTM $ HMap.insert subscriber (tagOutChan, leaseTime) foundSubMap
|
Just tagOutChan -> pure tagOutChan
|
||||||
pure tagOutChan
|
|
||||||
-- existing subscriber's channels are just returned
|
|
||||||
Just (tagOutChan, _) -> pure tagOutChan
|
|
||||||
|
|
||||||
|
|
||||||
-- | returns the broadcast channel of a hashtag if there are any subscribers to it
|
|
||||||
getTagBroadcastChannel :: PostService d -> Hashtag -> STM (Maybe (TChan PostID))
|
|
||||||
getTagBroadcastChannel serv tag = do
|
|
||||||
tagMap <- readTVar $ subscribers serv
|
|
||||||
case lookupRelayTags tag tagMap of
|
|
||||||
Nothing -> pure Nothing
|
|
||||||
Just (subscriberSTM, broadcastChan, _) -> do
|
|
||||||
subscriberMap <- readTVar subscriberSTM
|
|
||||||
if HMap.null subscriberMap
|
|
||||||
then pure Nothing
|
|
||||||
else pure (Just broadcastChan)
|
|
||||||
|
|
||||||
|
|
||||||
-- | look up the subscription data of a tag
|
|
||||||
lookupRelayTags :: Hashtag -> RelayTags -> Maybe (TagSubscribersSTM, TChan PostID, Hashtag)
|
|
||||||
lookupRelayTags tag = rMapLookup (genKeyID . Txt.unpack $ tag)
|
|
||||||
|
|
||||||
|
|
||||||
-- normalise the unicode representation of a string to NFC
|
-- normalise the unicode representation of a string to NFC
|
||||||
|
|
15
src/Hash2Pub/ServiceTypes.hs
Normal file
15
src/Hash2Pub/ServiceTypes.hs
Normal file
|
@ -0,0 +1,15 @@
|
||||||
|
{-# LANGUAGE MultiParamTypeClasses #-}
|
||||||
|
module Hash2Pub.ServiceTypes where
|
||||||
|
|
||||||
|
import Data.Hashable (Hashable (..))
|
||||||
|
|
||||||
|
import Hash2Pub.FediChord (DHT (..), NodeID (..))
|
||||||
|
|
||||||
|
class Service s d where
|
||||||
|
-- | run the service
|
||||||
|
runService :: (Integral i) => d -> String -> i -> IO (s d)
|
||||||
|
getServicePort :: (Integral i) => s d -> i
|
||||||
|
|
||||||
|
instance Hashable NodeID where
|
||||||
|
hashWithSalt salt = hashWithSalt salt . getNodeID
|
||||||
|
hash = hash . getNodeID
|
Loading…
Reference in a new issue