prototype instrumentation of periodically sending posts to a test setup of 3 nodes

- contributes to #59
This commit is contained in:
Trolli Schmittlauch 2020-08-27 12:01:08 +02:00
commit 3c1652d86d
5 changed files with 238 additions and 130 deletions

View file

@ -46,8 +46,8 @@ category: Network
extra-source-files: CHANGELOG.md extra-source-files: CHANGELOG.md
common deps common deps
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays
ghc-options: -Wall ghc-options: -Wall -Wpartial-fields
@ -93,6 +93,20 @@ executable Hash2Pub
ghc-options: -threaded ghc-options: -threaded
executable Experiment
-- experiment runner
import: deps
build-depends: Hash2Pub
main-is: Experiment.hs
hs-source-dirs: app
default-language: Haskell2010
ghc-options: -threaded
test-suite Hash2Pub-test test-suite Hash2Pub-test
-- Test dependencies. -- Test dependencies.

44
app/Experiment.hs Normal file
View file

@ -0,0 +1,44 @@
{-# LANGUAGE OverloadedStrings #-}
module Main where
import Control.Concurrent
import Control.Monad (forM_)
import Control.Monad.IO.Class
import Control.Monad.State.Class
import Control.Monad.State.Strict (evalStateT)
import qualified Network.HTTP.Client as HTTP
import System.Random
import Hash2Pub.PostService (Hashtag, clientPublishPost)
-- placeholder post data definition
tagsToPostTo = [ "JustSomeTag", "WantAnotherTag234", "HereWeGoAgain", "Oyä", "通信端末" ]
knownRelays :: [(String, Int)]
knownRelays =
[ ("animalliberation.social", 3342)
, ("hostux.social", 3343)
, ("social.diskseven.com", 3344)
, ("social.imirhil.fr", 3345)
]
main :: IO ()
main = do
-- initialise HTTP manager
httpMan <- HTTP.newManager HTTP.defaultManagerSettings
-- initialise RNG
let initRGen = mkStdGen 12
-- cycle through tags and post to a random instance
evalStateT (forM_ (cycle tagsToPostTo) $ publishPostRandom httpMan) initRGen
-- wait for a specified time
publishPostRandom :: (RandomGen g, MonadIO m, MonadState g m) => HTTP.Manager -> Hashtag -> m ()
publishPostRandom httpman tag = do
index <- state $ randomR (0, length knownRelays - 1)
let (pubHost, pubPort) = knownRelays !! index
_ <- liftIO . forkIO $ do
postResult <- liftIO $ clientPublishPost httpman pubHost pubPort ("foobar #" <> tag)
either putStrLn (const $ pure ()) postResult
liftIO $ threadDelay 500

View file

@ -49,9 +49,11 @@ import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar import Control.Concurrent.STM.TVar
import Control.Exception import Control.Exception
import Control.Monad (foldM, forM, forM_, void, when) import Control.Monad (foldM, forM, forM_, void, when)
import Control.Monad.Except (MonadError (..), runExceptT)
import Control.Monad.IO.Class (MonadIO (..))
import qualified Data.ByteString as BS import qualified Data.ByteString as BS
import Data.Either (rights) import Data.Either (rights)
import Data.Foldable (foldl', foldr') import Data.Foldable (foldl', foldr', foldrM)
import Data.Functor.Identity import Data.Functor.Identity
import Data.IP (IPv6, fromHostAddress6, import Data.IP (IPv6, fromHostAddress6,
toHostAddress6) toHostAddress6)
@ -106,9 +108,6 @@ queryLocalCache ownState nCache lBestNodes targetID
-- the closest succeeding node (like with the p initiated parallel queries -- the closest succeeding node (like with the p initiated parallel queries
| otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache | otherwise = FORWARD $ closestSuccessor `Set.union` closestCachePredecessors (lBestNodes-1) targetID nCache
where where
ownID = getNid ownState
preds = predecessors ownState
closestSuccessor :: Set.Set RemoteCacheEntry closestSuccessor :: Set.Set RemoteCacheEntry
closestSuccessor = maybe Set.empty (Set.singleton . toRemoteCacheEntry) $ cacheLookupSucc targetID nCache closestSuccessor = maybe Set.empty (Set.singleton . toRemoteCacheEntry) $ cacheLookupSucc targetID nCache
@ -259,7 +258,6 @@ handleIncomingRequest :: Service s (RealNodeSTM s)
-> SockAddr -- ^ source address of the request -> SockAddr -- ^ source address of the request
-> IO () -> IO ()
handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
putStrLn $ "handling incoming request: " <> show msgSet
ns <- readTVarIO nsSTM ns <- readTVarIO nsSTM
-- add nodestate to cache -- add nodestate to cache
now <- getPOSIXTime now <- getPOSIXTime
@ -314,7 +312,6 @@ handleIncomingRequest nsSTM sendQ msgSet sourceAddr = do
-- | execute a key ID lookup on local cache and respond with the result -- | execute a key ID lookup on local cache and respond with the result
respondQueryID :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondQueryID :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondQueryID nsSTM msgSet = do respondQueryID nsSTM msgSet = do
putStrLn "responding to a QueryID request"
-- this message cannot be split reasonably, so just -- this message cannot be split reasonably, so just
-- consider the first payload -- consider the first payload
let let
@ -435,7 +432,9 @@ respondJoin nsSTM msgSet = do
let let
aRequestPart = Set.elemAt 0 msgSet aRequestPart = Set.elemAt 0 msgSet
senderNS = sender aRequestPart senderNS = sender aRequestPart
responsibilityLookup = queryLocalCache nsSnap cache 1 (getNid senderNS) -- if not joined yet, attract responsibility for
-- all keys to make bootstrapping possible
responsibilityLookup = if isJoined nsSnap then queryLocalCache nsSnap cache 1 (getNid senderNS) else FOUND (toRemoteNodeState nsSnap)
thisNodeResponsible (FOUND _) = True thisNodeResponsible (FOUND _) = True
thisNodeResponsible (FORWARD _) = False thisNodeResponsible (FORWARD _) = False
-- check whether the joining node falls into our responsibility -- check whether the joining node falls into our responsibility
@ -492,7 +491,7 @@ requestJoin toJoinOn ownStateSTM = do
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ownState) srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ownState)
bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do bracket (mkSendSocket srcAddr (getDomain toJoinOn) (getDhtPort toJoinOn)) close (\sock -> do
-- extract own state for getting request information -- extract own state for getting request information
responses <- sendRequestTo 5000 3 (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock responses <- sendRequestTo (\rid -> Request rid (toRemoteNodeState ownState) 1 True Join (Just JoinRequestPayload)) sock
(cacheInsertQ, joinedState) <- atomically $ do (cacheInsertQ, joinedState) <- atomically $ do
stateSnap <- readTVar ownStateSTM stateSnap <- readTVar ownStateSTM
let let
@ -517,28 +516,28 @@ requestJoin toJoinOn ownStateSTM = do
([], Set.empty, Set.empty) ([], Set.empty, Set.empty)
responses responses
-- sort, slice and set the accumulated successors and predecessors -- sort, slice and set the accumulated successors and predecessors
newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap -- the contacted node itself is a successor as well and, with few
-- nodes, can be a predecessor as well
newState = setSuccessors (toRemoteNodeState toJoinOn:Set.elems succAccSet) . setPredecessors (toRemoteNodeState toJoinOn:Set.elems predAccSet) $ stateSnap
writeTVar ownStateSTM newState writeTVar ownStateSTM newState
pure (cacheInsertQ, newState) pure (cacheInsertQ, newState)
-- execute the cache insertions -- execute the cache insertions
mapM_ (\f -> f joinedState) cacheInsertQ mapM_ (\f -> f joinedState) cacheInsertQ
if responses == Set.empty if responses == Set.empty
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
else if null (predecessors joinedState) && null (successors joinedState) else do
then pure $ Left "join error: no predecessors or successors" -- wait for migration data to be completely received
-- successful join waitForMigrationFrom (nodeService prn) (getNid toJoinOn)
else do pure $ Right ownStateSTM
-- wait for migration data to be completely received
waitForMigrationFrom (nodeService prn) (getNid ownState)
pure $ Right ownStateSTM
) )
`catch` (\e -> pure . Left $ displayException (e :: IOException)) `catch` (\e -> pure . Left $ displayException (e :: IOException))
-- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID. -- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID.
requestQueryID :: LocalNodeState s -- ^ NodeState of the querying node requestQueryID :: (MonadIO m, MonadError String m)
=> LocalNodeState s -- ^ NodeState of the querying node
-> NodeID -- ^ target key ID to look up -> NodeID -- ^ target key ID to look up
-> IO RemoteNodeState -- ^ the node responsible for handling that key -> m RemoteNodeState -- ^ the node responsible for handling that key
-- 1. do a local lookup for the l closest nodes -- 1. do a local lookup for the l closest nodes
-- 2. create l sockets -- 2. create l sockets
-- 3. send a message async concurrently to all l nodes -- 3. send a message async concurrently to all l nodes
@ -546,23 +545,23 @@ requestQueryID :: LocalNodeState s -- ^ NodeState of the querying node
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results) -- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
-- TODO: deal with lookup failures -- TODO: deal with lookup failures
requestQueryID ns targetID = do requestQueryID ns targetID = do
firstCacheSnapshot <- readTVarIO . nodeCacheSTM $ ns firstCacheSnapshot <- liftIO . readTVarIO . nodeCacheSTM $ ns
-- TODO: make maxAttempts configurable -- TODO: make maxAttempts configurable
queryIdLookupLoop firstCacheSnapshot ns 50 targetID queryIdLookupLoop firstCacheSnapshot ns 50 targetID
-- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining -- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining
queryIdLookupLoop :: NodeCache -> LocalNodeState s -> Int -> NodeID -> IO RemoteNodeState queryIdLookupLoop :: (MonadIO m, MonadError String m) => NodeCache -> LocalNodeState s -> Int -> NodeID -> m RemoteNodeState
-- return node itself as default fallback value against infinite recursion. -- return node itself as default fallback value against infinite recursion.
-- TODO: consider using an Either instead of a default value -- TODO: consider using an Either instead of a default value
queryIdLookupLoop _ ns 0 _ = pure $ toRemoteNodeState ns queryIdLookupLoop _ ns 0 _ = throwError "exhausted maximum lookup attempts"
queryIdLookupLoop cacheSnapshot ns maxAttempts targetID = do queryIdLookupLoop cacheSnapshot ns maxAttempts targetID = do
let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID
-- FOUND can only be returned if targetID is owned by local node -- FOUND can only be returned if targetID is owned by local node
case localResult of case localResult of
FOUND thisNode -> pure thisNode FOUND thisNode -> pure thisNode
FORWARD nodeSet -> do FORWARD nodeSet -> do
responseEntries <- sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet) responseEntries <- liftIO $ sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet)
now <- getPOSIXTime now <- liftIO getPOSIXTime
-- check for a FOUND and return it -- check for a FOUND and return it
case responseEntries of case responseEntries of
FOUND foundNode -> pure foundNode FOUND foundNode -> pure foundNode
@ -588,7 +587,7 @@ sendQueryIdMessages targetID ns lParam targets = do
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
-- ToDo: make attempts and timeout configurable -- ToDo: make attempts and timeout configurable
queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close ( queryThreads <- mapM (\resultNode -> async $ bracket (mkSendSocket srcAddr (getDomain resultNode) (getDhtPort resultNode)) close (
sendRequestTo 5000 3 (lookupMessage targetID ns Nothing) sendRequestTo (lookupMessage targetID ns Nothing)
)) targets )) targets
-- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613 -- ToDo: process results immediately instead of waiting for the last one to finish, see https://stackoverflow.com/a/38815224/9198613
-- ToDo: exception handling, maybe log them -- ToDo: exception handling, maybe log them
@ -596,8 +595,10 @@ sendQueryIdMessages targetID ns lParam targets = do
-- insert new cache entries both into global cache as well as return accumulated QueryResponses for further processing -- insert new cache entries both into global cache as well as return accumulated QueryResponses for further processing
now <- getPOSIXTime now <- getPOSIXTime
-- collect cache entries from all responses -- collect cache entries from all responses
foldM (\acc resp -> do foldrM (\resp acc -> do
let entrySet = case queryResult <$> payload resp of let
responseResult = queryResult <$> payload resp
entrySet = case responseResult of
Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now) Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now)
Just (FORWARD resultset) -> resultset Just (FORWARD resultset) -> resultset
_ -> Set.empty _ -> Set.empty
@ -607,10 +608,15 @@ sendQueryIdMessages targetID ns lParam targets = do
-- return accumulated QueryResult -- return accumulated QueryResult
pure $ case acc of pure $ case acc of
-- once a FOUND as been encountered, return this as a result -- once a FOUND as been encountered, return this as a result
isFound@FOUND{} -> isFound FOUND{} -> acc
FORWARD accSet -> FORWARD $ entrySet `Set.union` accSet FORWARD accSet
| maybe False isFound responseResult -> fromJust responseResult
| otherwise -> FORWARD $ entrySet `Set.union` accSet
) (FORWARD Set.empty) responses ) (FORWARD Set.empty) responses
where
isFound FOUND{} = True
isFound _ = False
-- | Create a QueryID message to be supplied to 'sendRequestTo' -- | Create a QueryID message to be supplied to 'sendRequestTo'
lookupMessage :: Integral i lookupMessage :: Integral i
@ -630,7 +636,7 @@ requestStabilise :: LocalNodeState s -- ^ sending node
-> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node -> IO (Either String ([RemoteNodeState], [RemoteNodeState])) -- ^ (predecessors, successors) of responding node
requestStabilise ns neighbour = do requestStabilise ns neighbour = do
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo 5000 3 (\rid -> responses <- bracket (mkSendSocket srcAddr (getDomain neighbour) (getDhtPort neighbour)) close (fmap Right . sendRequestTo (\rid ->
Request { Request {
requestID = rid requestID = rid
, sender = toRemoteNodeState ns , sender = toRemoteNodeState ns
@ -652,8 +658,7 @@ requestStabilise ns neighbour = do
) )
([],[]) respSet ([],[]) respSet
-- update successfully responded neighbour in cache -- update successfully responded neighbour in cache
now <- getPOSIXTime maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet)
maybe (pure ()) (\p -> queueAddEntries (Identity $ RemoteCacheEntry (sender p) now) ns) $ headMay (Set.elems respSet)
pure $ if null responsePreds && null responseSuccs pure $ if null responsePreds && null responseSuccs
then Left "no neighbours returned" then Left "no neighbours returned"
else Right (responsePreds, responseSuccs) else Right (responsePreds, responseSuccs)
@ -674,7 +679,7 @@ requestLeave ns doMigration target = do
, leavePredecessors = predecessors ns , leavePredecessors = predecessors ns
, leaveDoMigration = doMigration , leaveDoMigration = doMigration
} }
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo 5000 3 (\rid -> responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close (fmap Right . sendRequestTo (\rid ->
Request { Request {
requestID = rid requestID = rid
, sender = toRemoteNodeState ns , sender = toRemoteNodeState ns
@ -699,7 +704,7 @@ requestPing ns target = do
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close responses <- bracket (mkSendSocket srcAddr (getDomain target) (getDhtPort target)) close
(\sock -> do (\sock -> do
resp <- sendRequestTo 5000 3 (\rid -> resp <- sendRequestTo (\rid ->
Request { Request {
requestID = rid requestID = rid
, sender = toRemoteNodeState ns , sender = toRemoteNodeState ns
@ -735,25 +740,31 @@ requestPing ns target = do
) responses ) responses
-- | 'sendRequestToWithParams' with default timeout and retries already specified.
-- Generic function for sending a request over a connected socket and collecting the response.
-- Serialises the message and tries to deliver its parts for a number of attempts within a default timeout.
sendRequestTo :: (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID
-> Socket -- ^ connected socket to use for sending
-> IO (Set.Set FediChordMessage) -- ^ responses
sendRequestTo = sendRequestToWithParams 5000 3
-- | Generic function for sending a request over a connected socket and collecting the response. -- | Generic function for sending a request over a connected socket and collecting the response.
-- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout. -- Serialises the message and tries to deliver its parts for a number of attempts within a specified timeout.
sendRequestTo :: Int -- ^ timeout in seconds sendRequestToWithParams :: Int -- ^ timeout in milliseconds
-> Int -- ^ number of retries -> Int -- ^ number of retries
-> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID -> (Integer -> FediChordMessage) -- ^ the message to be sent, still needing a requestID
-> Socket -- ^ connected socket to use for sending -> Socket -- ^ connected socket to use for sending
-> IO (Set.Set FediChordMessage) -- ^ responses -> IO (Set.Set FediChordMessage) -- ^ responses
sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do
-- give the message a random request ID -- give the message a random request ID
randomID <- randomRIO (0, 2^32-1) randomID <- randomRIO (0, 2^32-1)
let let
msgComplete = msgIncomplete randomID msgComplete = msgIncomplete randomID
requests = serialiseMessage sendMessageSize msgComplete requests = serialiseMessage sendMessageSize msgComplete
putStrLn $ "sending request message " <> show msgComplete
-- create a queue for passing received response messages back, even after a timeout -- create a queue for passing received response messages back, even after a timeout
responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets responseQ <- newTBQueueIO $ 2*maximumParts -- keep room for duplicate packets
-- start sendAndAck with timeout -- start sendAndAck with timeout
attempts numAttempts . timeout timeoutMillis $ sendAndAck responseQ sock requests attempts numAttempts . timeout (timeoutMillis*1000) $ sendAndAck responseQ sock requests
-- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary. -- after timeout, check received responses, delete them from unacked message set/ map and rerun senAndAck with that if necessary.
recvdParts <- atomically $ flushTBQueue responseQ recvdParts <- atomically $ flushTBQueue responseQ
pure $ Set.fromList recvdParts pure $ Set.fromList recvdParts
@ -762,19 +773,20 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
-> Socket -- ^ the socket used for sending and receiving for this particular remote node -> Socket -- ^ the socket used for sending and receiving for this particular remote node
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
-> IO () -> IO ()
sendAndAck responseQueue sock remainingSends = do sendAndAck responseQueue sock' remainingSends = do
sendMany sock $ Map.elems remainingSends sendMany sock' $ Map.elems remainingSends
-- if all requests have been acked/ responded to, return prematurely -- if all requests have been acked/ responded to, return prematurely
recvLoop responseQueue remainingSends Set.empty Nothing recvLoop sock' responseQueue remainingSends Set.empty Nothing
recvLoop :: TBQueue FediChordMessage -- ^ the queue for putting in the received responses recvLoop :: Socket
-> TBQueue FediChordMessage -- ^ the queue for putting in the received responses
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
-> Set.Set Integer -- ^ already received response part numbers -> Set.Set Integer -- ^ already received response part numbers
-> Maybe Integer -- ^ total number of response parts if already known -> Maybe Integer -- ^ total number of response parts if already known
-> IO () -> IO ()
recvLoop responseQueue remainingSends' receivedPartNums totalParts = do recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts = do
-- 65535 is maximum length of UDP packets, as long as -- 65535 is maximum length of UDP packets, as long as
-- no IPv6 jumbograms are used -- no IPv6 jumbograms are used
response <- deserialiseMessage <$> recv sock 65535 response <- deserialiseMessage <$> recv sock' 65535
case response of case response of
Right msg@Response{} -> do Right msg@Response{} -> do
atomically $ writeTBQueue responseQueue msg atomically $ writeTBQueue responseQueue msg
@ -782,11 +794,12 @@ sendRequestTo timeoutMillis numAttempts msgIncomplete sock = do
newTotalParts = if isFinalPart msg then Just (part msg) else totalParts newTotalParts = if isFinalPart msg then Just (part msg) else totalParts
newRemaining = Map.delete (part msg) remainingSends' newRemaining = Map.delete (part msg) remainingSends'
newReceivedParts = Set.insert (part msg) receivedPartNums newReceivedParts = Set.insert (part msg) receivedPartNums
if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts if Map.null newRemaining && maybe False (\p -> Set.size newReceivedParts == fromIntegral p) newTotalParts
then pure () then pure ()
else recvLoop responseQueue newRemaining receivedPartNums newTotalParts else recvLoop sock' responseQueue newRemaining newReceivedParts newTotalParts
-- drop errors and invalid messages -- drop errors and invalid messages
Left _ -> recvLoop responseQueue remainingSends' receivedPartNums totalParts Right Request{} -> pure () -- expecting a response, not a request
Left _ -> recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts
-- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache -- | enqueue a list of RemoteCacheEntries to be added to the global NodeCache
@ -812,6 +825,18 @@ queueDeleteEntry :: NodeID
-> IO () -> IO ()
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
-- | enqueue the timestamp update and verification marking of an entry in the
-- global 'NodeCache'.
queueUpdateVerifieds :: Foldable c
=> c NodeID
-> LocalNodeState s
-> IO ()
queueUpdateVerifieds nIds ns = do
now <- getPOSIXTime
forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $
markCacheEntryAsVerified (Just now) nid'
-- | retry an IO action at most *i* times until it delivers a result -- | retry an IO action at most *i* times until it delivers a result
attempts :: Int -- ^ number of retries *i* attempts :: Int -- ^ number of retries *i*
-> IO (Maybe a) -- ^ action to retry -> IO (Maybe a) -- ^ action to retry

View file

@ -166,6 +166,7 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do
currentlyResponsible <- liftEither lookupResp currentlyResponsible <- liftEither lookupResp
liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
-- 2. then send a join to the currently responsible node -- 2. then send a join to the currently responsible node
liftIO $ putStrLn "send a bootstrap Join"
joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM
liftEither joinResult liftEither joinResult
@ -225,7 +226,7 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns) srcAddr <- confIP . nodeConfig <$> readTVarIO (parentRealNode ns)
bootstrapResponse <- bracket (mkSendSocket srcAddr bootstrapHost bootstrapPort) close ( bootstrapResponse <- bracket (mkSendSocket srcAddr bootstrapHost bootstrapPort) close (
-- Initialise an empty cache only with the responses from a bootstrapping node -- Initialise an empty cache only with the responses from a bootstrapping node
fmap Right . sendRequestTo 5000 3 (lookupMessage targetID ns Nothing) fmap Right . sendRequestTo (lookupMessage targetID ns Nothing)
) )
`catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException)) `catch` (\e -> pure . Left $ "Error at bootstrap QueryId: " <> displayException (e :: IOException))
@ -244,26 +245,24 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
) )
initCache resp initCache resp
currentlyResponsible <- queryIdLookupLoop bootstrapCache ns 50 $ getNid ns currentlyResponsible <- runExceptT $ queryIdLookupLoop bootstrapCache ns 50 $ getNid ns
pure $ Right currentlyResponsible pure currentlyResponsible
-- | join a node to the DHT using the global node cache -- | join a node to the DHT using the global node cache
-- node's position. -- node's position.
fediChordVserverJoin :: Service s (RealNodeSTM s) fediChordVserverJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s))
=> LocalNodeStateSTM s -- ^ the local 'NodeState' => LocalNodeStateSTM s -- ^ the local 'NodeState'
-> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a -> m (LocalNodeStateSTM s) -- ^ the joined 'NodeState' after a
-- successful join, otherwise an error message -- successful join, otherwise an error message
fediChordVserverJoin nsSTM = do fediChordVserverJoin nsSTM = do
ns <- readTVarIO nsSTM ns <- liftIO $ readTVarIO nsSTM
-- 1. get routed to the currently responsible node -- 1. get routed to the currently responsible node
currentlyResponsible <- requestQueryID ns $ getNid ns currentlyResponsible <- requestQueryID ns $ getNid ns
putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
-- 2. then send a join to the currently responsible node -- 2. then send a join to the currently responsible node
joinResult <- requestJoin currentlyResponsible nsSTM joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM
case joinResult of liftEither joinResult
Left err -> pure . Left $ "Error joining on " <> err
Right joinedNS -> pure . Right $ joinedNS
fediChordVserverLeave :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeState s -> m () fediChordVserverLeave :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeState s -> m ()
fediChordVserverLeave ns = do fediChordVserverLeave ns = do
@ -323,7 +322,7 @@ joinOnNewEntriesThread nsSTM = loop
pure () pure ()
-- otherwise try joining -- otherwise try joining
FORWARD _ -> do FORWARD _ -> do
joinResult <- fediChordVserverJoin nsSTM joinResult <- runExceptT $ fediChordVserverJoin nsSTM
either either
-- on join failure, sleep and retry -- on join failure, sleep and retry
-- TODO: make delay configurable -- TODO: make delay configurable
@ -504,18 +503,26 @@ stabiliseThread nsSTM = forever $ do
-- try looking up additional neighbours if list too short -- try looking up additional neighbours if list too short
forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
ns' <- readTVarIO nsSTM ns' <- readTVarIO nsSTM
nextEntry <- requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns')
atomically $ do either
latestNs <- readTVar nsSTM (const $ pure ())
writeTVar nsSTM $ addPredecessors [nextEntry] latestNs (\entry -> atomically $ do
latestNs <- readTVar nsSTM
writeTVar nsSTM $ addPredecessors [entry] latestNs
)
nextEntry
) )
forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
ns' <- readTVarIO nsSTM ns' <- readTVarIO nsSTM
nextEntry <- requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns')
atomically $ do either
latestNs <- readTVar nsSTM (const $ pure ())
writeTVar nsSTM $ addSuccessors [nextEntry] latestNs (\entry -> atomically $ do
latestNs <- readTVar nsSTM
writeTVar nsSTM $ addSuccessors [entry] latestNs
)
nextEntry
) )
newNs <- readTVarIO nsSTM newNs <- readTVarIO nsSTM
@ -638,7 +645,7 @@ requestMapPurge :: MVar RequestMap -> IO ()
requestMapPurge mapVar = forever $ do requestMapPurge mapVar = forever $ do
rMapState <- takeMVar mapVar rMapState <- takeMVar mapVar
now <- getPOSIXTime now <- getPOSIXTime
putMVar mapVar $ Map.filter (\entry@(RequestMapEntry _ _ ts) -> putMVar mapVar $ Map.filter (\(RequestMapEntry _ _ ts) ->
now - ts < responsePurgeAge now - ts < responsePurgeAge
) rMapState ) rMapState
threadDelay $ round responsePurgeAge * 2 * 10^6 threadDelay $ round responsePurgeAge * 2 * 10^6
@ -716,7 +723,7 @@ fediMessageHandler sendQ recvQ nsSTM = do
instance DHT (RealNodeSTM s) where instance DHT (RealNodeSTM s) where
lookupKey nodeSTM keystring = getKeyResponsibility nodeSTM $ genKeyID keystring lookupKey nodeSTM keystring = getKeyResponsibility nodeSTM $ genKeyID keystring
forceLookupKey nodeSTM keystring = updateLookupCache nodeSTM $ genKeyID keystring forceLookupKey nodeSTM keystring = (putStrLn $ "forced responsibility lookup of #" <> keystring) >> (updateLookupCache nodeSTM $ genKeyID keystring)
-- potential better implementation: put all neighbours of all vservers and the vservers on a ringMap, look the key up and see whether it results in a LocalNodeState -- potential better implementation: put all neighbours of all vservers and the vservers on a ringMap, look the key up and see whether it results in a LocalNodeState
isResponsibleFor nodeSTM key = do isResponsibleFor nodeSTM key = do
node <- readTVarIO nodeSTM node <- readTVarIO nodeSTM
@ -757,7 +764,7 @@ getKeyResponsibility nodeSTM lookupKey = do
-- new entry. -- new entry.
-- If no vserver is active in the DHT, 'Nothing' is returned. -- If no vserver is active in the DHT, 'Nothing' is returned.
updateLookupCache :: RealNodeSTM s -> NodeID -> IO (Maybe (String, PortNumber)) updateLookupCache :: RealNodeSTM s -> NodeID -> IO (Maybe (String, PortNumber))
updateLookupCache nodeSTM lookupKey = do updateLookupCache nodeSTM keyToLookup = do
(node, lookupSource) <- atomically $ do (node, lookupSource) <- atomically $ do
node <- readTVar nodeSTM node <- readTVar nodeSTM
let firstVs = headMay (vservers node) let firstVs = headMay (vservers node)
@ -767,18 +774,25 @@ updateLookupCache nodeSTM lookupKey = do
pure (node, lookupSource) pure (node, lookupSource)
maybe (do maybe (do
-- if no local node available, delete cache entry and return Nothing -- if no local node available, delete cache entry and return Nothing
atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete lookupKey atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete keyToLookup
pure Nothing pure Nothing
) )
(\n -> do (\n -> do
-- start a lookup from the node, update the cache with the lookup result and return it -- start a lookup from the node, update the cache with the lookup result and return it
newResponsible <- requestQueryID n lookupKey -- TODO: better retry management, because having no vserver joined yet should
let newEntry = (getDomain newResponsible, getServicePort newResponsible) -- be treated differently than other reasons for not getting a result.
now <- getPOSIXTime newResponsible <- runExceptT $ requestQueryID n keyToLookup
-- atomic update against lost updates either
atomically $ modifyTVar' (lookupCacheSTM node) $ (const $ pure Nothing)
Map.insert lookupKey (CacheEntry False newEntry now) (\result -> do
pure $ Just newEntry let newEntry = (getDomain result, getServicePort result)
now <- getPOSIXTime
-- atomic update against lost updates
atomically $ modifyTVar' (lookupCacheSTM node) $
Map.insert keyToLookup (CacheEntry False newEntry now)
pure $ Just newEntry
)
newResponsible
) lookupSource ) lookupSource

View file

@ -10,37 +10,30 @@ module Hash2Pub.PostService where
import Control.Concurrent import Control.Concurrent
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Concurrent.MVar
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Concurrent.STM.TChan import Control.Exception (Exception (..), try)
import Control.Concurrent.STM.TChan import Control.Monad (foldM, forM, forM_, forever, void,
import Control.Concurrent.STM.TQueue when)
import Control.Concurrent.STM.TVar import Control.Monad.IO.Class (liftIO)
import Control.Exception (Exception (..), try)
import Control.Monad (foldM, forM, forM_, forever)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.STM
import Data.Bifunctor import Data.Bifunctor
import qualified Data.ByteString.Lazy.UTF8 as BSUL import qualified Data.ByteString.Lazy.UTF8 as BSUL
import qualified Data.ByteString.UTF8 as BSU import qualified Data.ByteString.UTF8 as BSU
import qualified Data.HashMap.Strict as HMap import qualified Data.HashMap.Strict as HMap
import qualified Data.HashSet as HSet import qualified Data.HashSet as HSet
import Data.Maybe (fromMaybe, isJust) import Data.Maybe (fromMaybe, isJust)
import Data.String (fromString) import Data.String (fromString)
import qualified Data.Text.Lazy as Txt import qualified Data.Text.Lazy as Txt
import Data.Text.Normalize (NormalizationMode (NFC), import Data.Text.Normalize (NormalizationMode (NFC), normalize)
normalize)
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import Data.Typeable (Typeable) import Data.Typeable (Typeable)
import qualified Network.HTTP.Client as HTTP import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Types as HTTPT import qualified Network.HTTP.Types as HTTPT
import System.Random import System.Random
import Text.Read (readEither) import Text.Read (readEither)
import qualified Network.Wai.Handler.Warp as Warp import qualified Network.Wai.Handler.Warp as Warp
import Servant import Servant
import Servant.Client import Servant.Client
import Servant.Server
import Hash2Pub.FediChordTypes import Hash2Pub.FediChordTypes
import Hash2Pub.RingMap import Hash2Pub.RingMap
@ -150,7 +143,7 @@ placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB
-- ========= HTTP API and handlers ============= -- ========= HTTP API and handlers =============
type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
-- delivery endpoint of newly published posts of the relay's instance -- delivery endpoint at responsible relay for delivering posts of $tag for distribution
:<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text :<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text
-- endpoint for delivering the subscriptions and outstanding queue -- endpoint for delivering the subscriptions and outstanding queue
:<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text :<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text
@ -187,11 +180,11 @@ relayInbox serv tag posts = do
-- skip checking whether the post actually contains the tag, just drop full post -- skip checking whether the post actually contains the tag, just drop full post
postIDs = head . Txt.splitOn "," <$> Txt.lines posts postIDs = head . Txt.splitOn "," <$> Txt.lines posts
-- if tag is not in own responsibility, return a 410 Gone -- if tag is not in own responsibility, return a 410 Gone
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (genKeyID . Txt.unpack $ tag) responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId tag)
if responsible if responsible
then pure () then pure ()
else else
(throwError $ err410 { errBody = "Relay is not responsible for this tag"}) throwError $ err410 { errBody = "Relay is not responsible for this tag"}
broadcastChan <- liftIO $ atomically $ getTagBroadcastChannel serv tag broadcastChan <- liftIO $ atomically $ getTagBroadcastChannel serv tag
maybe maybe
-- if noone subscribed to the tag, nothing needs to be done -- if noone subscribed to the tag, nothing needs to be done
@ -221,7 +214,7 @@ subscriptionDelivery serv senderID subList = do
-- not-handled tag occurs, this results in a single large transaction. -- not-handled tag occurs, this results in a single large transaction.
-- Hopefully the performance isn't too bad. -- Hopefully the performance isn't too bad.
res <- liftIO . atomically $ (foldM (\_ tag' -> do res <- liftIO . atomically $ (foldM (\_ tag' -> do
responsible <- isResponsibleForSTM (baseDHT serv) (genKeyID . Txt.unpack $ tag') responsible <- isResponsibleForSTM (baseDHT serv) (hashtagToId tag')
if responsible if responsible
then processTag (subscribers serv) tag' then processTag (subscribers serv) tag'
else throwSTM $ UnhandledTagException (Txt.unpack tag' <> " not handled by this relay") else throwSTM $ UnhandledTagException (Txt.unpack tag' <> " not handled by this relay")
@ -295,7 +288,7 @@ tagDelivery :: PostService d -> Txt.Text -> Txt.Text -> Handler Txt.Text
tagDelivery serv hashtag posts = do tagDelivery serv hashtag posts = do
let postIDs = Txt.lines posts let postIDs = Txt.lines posts
subscriptions <- liftIO . readTVarIO . ownSubscriptions $ serv subscriptions <- liftIO . readTVarIO . ownSubscriptions $ serv
if isJust (HMap.lookup (genKeyID . Txt.unpack $ hashtag) subscriptions) if isJust (HMap.lookup (hashtagToId hashtag) subscriptions)
then -- TODO: increase a counter/ statistics for received posts of this tag then -- TODO: increase a counter/ statistics for received posts of this tag
liftIO $ forM_ postIDs $ atomically . writeTQueue (postFetchQueue serv) liftIO $ forM_ postIDs $ atomically . writeTQueue (postFetchQueue serv)
else -- silently drop posts from unsubscribed tags else -- silently drop posts from unsubscribed tags
@ -304,7 +297,7 @@ tagDelivery serv hashtag posts = do
tagSubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer tagSubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Integer
tagSubscribe serv hashtag origin = do tagSubscribe serv hashtag origin = do
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (genKeyID . Txt.unpack $ hashtag) responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag)
if not responsible if not responsible
-- GONE if not responsible -- GONE if not responsible
then throwError err410 { errBody = "not responsible for this tag" } then throwError err410 { errBody = "not responsible for this tag" }
@ -323,7 +316,7 @@ tagSubscribe serv hashtag origin = do
tagUnsubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Text tagUnsubscribe :: DHT d => PostService d -> Txt.Text -> Maybe Txt.Text -> Handler Txt.Text
tagUnsubscribe serv hashtag origin = do tagUnsubscribe serv hashtag origin = do
responsible <- liftIO $ isResponsibleFor (baseDHT serv) (genKeyID . Txt.unpack $ hashtag) responsible <- liftIO $ isResponsibleFor (baseDHT serv) (hashtagToId hashtag)
if not responsible if not responsible
-- GONE if not responsible -- GONE if not responsible
then throwError err410 { errBody = "not responsible for this tag" } then throwError err410 { errBody = "not responsible for this tag" }
@ -355,7 +348,7 @@ clientDeliverSubscriptions :: PostService d
-> (String, Int) -- ^ hostname and port of instance to deliver to -> (String, Int) -- ^ hostname and port of instance to deliver to
-> IO (Either String ()) -- Either signals success or failure -> IO (Either String ()) -- Either signals success or failure
clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do
-- collect tag intearval -- collect tag interval
intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv) intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv)
-- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ] -- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ]
-- extract subscribers and posts -- extract subscribers and posts
@ -385,7 +378,7 @@ clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do
Right _ -> do Right _ -> do
atomically $ atomically $
modifyTVar' (subscribers serv) $ \tagMap -> modifyTVar' (subscribers serv) $ \tagMap ->
foldr deleteRMapEntry tagMap ((\(_, _, t) -> genKeyID . Txt.unpack $ t) <$> intervalTags) foldr deleteRMapEntry tagMap ((\(_, _, t) -> hashtagToId t) <$> intervalTags)
pure . Right $ () pure . Right $ ()
where where
channelGetAll :: TChan a -> STM [a] channelGetAll :: TChan a -> STM [a]
@ -396,7 +389,8 @@ clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do
maybe (pure acc) (\x -> channelGetAll' chan (x:acc)) haveRead maybe (pure acc) (\x -> channelGetAll' chan (x:acc)) haveRead
-- | Subscribe the client to the given hashtag. On success it returns the given lease time. -- | Subscribe the client to the given hashtag. On success it returns the given lease time,
-- but also records the subscription in its own data structure.
clientSubscribeTo :: DHT d => PostService d -> Hashtag -> IO (Either String Integer) clientSubscribeTo :: DHT d => PostService d -> Hashtag -> IO (Either String Integer)
clientSubscribeTo serv tag = do clientSubscribeTo serv tag = do
lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag) lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag)
@ -413,7 +407,9 @@ clientSubscribeTo serv tag = do
newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag) newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
doSubscribe newRes False doSubscribe newRes False
Left err -> pure . Left . show $ err Left err -> pure . Left . show $ err
Right lease -> pure . Right $ lease Right lease -> do
atomically . modifyTVar' (ownSubscriptions serv) $ HMap.insert (hashtagToId tag) (fromInteger lease)
pure . Right $ lease
) )
lookupResponse lookupResponse
@ -435,7 +431,9 @@ clientUnsubscribeFrom serv tag = do
newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag) newRes <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
doUnsubscribe newRes False doUnsubscribe newRes False
Left err -> pure . Left . show $ err Left err -> pure . Left . show $ err
Right _ -> pure . Right $ () Right _ -> do
atomically . modifyTVar' (ownSubscriptions serv) $ HMap.delete (hashtagToId tag)
pure . Right $ ()
) )
lookupResponse lookupResponse
@ -445,11 +443,11 @@ clientUnsubscribeFrom serv tag = do
-- the post to the responsible relays. -- the post to the responsible relays.
-- As the initial publishing isn't done by a specific relay (but *to* a specific relay -- As the initial publishing isn't done by a specific relay (but *to* a specific relay
-- instead), the function does *not* take a PostService as argument. -- instead), the function does *not* take a PostService as argument.
clientPublishPost :: HTTP.Manager -- for better performance, a shared HTTP manager has to be provided clientPublishPost :: HTTP.Manager -- ^ for better performance, a shared HTTP manager has to be provided
-> String -- hostname -> String -- ^ hostname
-> Int -- port -> Int -- ^ port
-> PostContent -- post content -> PostContent -- ^ post content
-> IO (Either String ()) -- error or success -> IO (Either String ()) -- ^ error or success
clientPublishPost httpman hostname port postC = do clientPublishPost httpman hostname port postC = do
resp <- runClientM (postInboxClient postC) (mkClientEnv httpman (BaseUrl Http hostname port "")) resp <- runClientM (postInboxClient postC) (mkClientEnv httpman (BaseUrl Http hostname port ""))
pure . bimap show (const ()) $ resp pure . bimap show (const ()) $ resp
@ -492,7 +490,7 @@ setupSubscriberChannel tagMapSTM tag subscriber leaseTime = do
broadcastChan <- newBroadcastTChan broadcastChan <- newBroadcastTChan
tagOutChan <- dupTChan broadcastChan tagOutChan <- dupTChan broadcastChan
newSubMapSTM <- newTVar $ HMap.singleton subscriber (tagOutChan, leaseTime) newSubMapSTM <- newTVar $ HMap.singleton subscriber (tagOutChan, leaseTime)
writeTVar tagMapSTM $ addRMapEntry (genKeyID . Txt.unpack $ tag) (newSubMapSTM, broadcastChan, tag) tagMap writeTVar tagMapSTM $ addRMapEntry (hashtagToId tag) (newSubMapSTM, broadcastChan, tag) tagMap
pure tagOutChan pure tagOutChan
Just (foundSubMapSTM, broadcastChan, _) -> do Just (foundSubMapSTM, broadcastChan, _) -> do
-- otherwise use the existing subscriber map -- otherwise use the existing subscriber map
@ -520,7 +518,7 @@ deleteSubscription tagMapSTM tag subscriber = do
-- if there are no subscriptions for the tag anymore, remove its -- if there are no subscriptions for the tag anymore, remove its
-- data sttructure altogether -- data sttructure altogether
if HMap.null newSubMap if HMap.null newSubMap
then writeTVar tagMapSTM $ deleteRMapEntry (genKeyID . Txt.unpack $ tag) tagMap then writeTVar tagMapSTM $ deleteRMapEntry (hashtagToId tag) tagMap
-- otherwise just remove the subscription of that node -- otherwise just remove the subscription of that node
else writeTVar foundSubMapSTM newSubMap else writeTVar foundSubMapSTM newSubMap
@ -541,13 +539,18 @@ getTagBroadcastChannel serv tag = do
-- | look up the subscription data of a tag -- | look up the subscription data of a tag
lookupTagSubscriptions :: Hashtag -> RingMap NodeID a -> Maybe a lookupTagSubscriptions :: Hashtag -> RingMap NodeID a -> Maybe a
lookupTagSubscriptions tag = rMapLookup (genKeyID . Txt.unpack $ tag) lookupTagSubscriptions tag = rMapLookup (hashtagToId tag)
-- normalise the unicode representation of a string to NFC -- normalise the unicode representation of a string to NFC
normaliseTag :: Txt.Text -> Txt.Text normaliseTag :: Txt.Text -> Txt.Text
normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict normaliseTag = Txt.fromStrict . normalize NFC . Txt.toStrict
-- | convert a hashtag to its representation on the DHT
hashtagToId :: Hashtag -> NodeID
hashtagToId = genKeyID . Txt.unpack
-- | define how to convert all showable types to PlainText -- | define how to convert all showable types to PlainText
-- No idea what I'm doing with these overlappable instances though ¯\_(ツ)_/¯ -- No idea what I'm doing with these overlappable instances though ¯\_(ツ)_/¯
-- TODO: figure out how this overlapping stuff actually works https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/glasgow_exts.html#instance-overlap -- TODO: figure out how this overlapping stuff actually works https://downloads.haskell.org/~ghc/latest/docs/html/users_guide/glasgow_exts.html#instance-overlap
@ -566,12 +569,13 @@ processIncomingPosts serv = forever $ do
-- blocks until available -- blocks until available
-- TODO: process multiple in parallel -- TODO: process multiple in parallel
(tag, pID, pContent) <- atomically . readTQueue $ relayInQueue serv (tag, pID, pContent) <- atomically . readTQueue $ relayInQueue serv
let pIdUri = "http://" <> (Txt.pack . confServiceHost . serviceConf $ serv) <> ":" <> (fromString . show . confServicePort . serviceConf $ serv) <> "/post/" <> pID
lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag) lookupRes <- lookupKey (baseDHT serv) (Txt.unpack tag)
case lookupRes of case lookupRes of
-- no vserver active => wait and retry -- no vserver active => wait and retry
Nothing -> threadDelay $ 10 * 10^6 Nothing -> threadDelay $ 10 * 10^6
Just (responsibleHost, responsiblePort) -> do Just (responsibleHost, responsiblePort) -> do
resp <- runClientM (relayInboxClient tag $ pID <> "," <> pContent) (mkClientEnv (httpMan serv) (BaseUrl Http responsibleHost (fromIntegral responsiblePort) "")) resp <- runClientM (relayInboxClient tag $ pIdUri <> "," <> pContent) (mkClientEnv (httpMan serv) (BaseUrl Http responsibleHost (fromIntegral responsiblePort) ""))
case resp of case resp of
Left err -> do Left err -> do
putStrLn $ "Error: " <> show err putStrLn $ "Error: " <> show err
@ -580,7 +584,14 @@ processIncomingPosts serv = forever $ do
-- TODO: keep track of maximum retries -- TODO: keep track of maximum retries
_ <- forceLookupKey (baseDHT serv) (Txt.unpack tag) _ <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent) atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent)
Right yay -> putStrLn $ "Yay! " <> show yay Right _ -> do
-- TODO: stats
-- idea for the experiment: each post publication makes the initial posting instance subscribe to all contained tags
now <- getPOSIXTime
subscriptionStatus <- HMap.lookup (hashtagToId tag) <$> readTVarIO (ownSubscriptions serv)
-- if not yet subscribed or subscription expires within 2 minutes, (re)subscribe to tag
when (maybe False (\subLease -> now - subLease < 120) subscriptionStatus) $
void $ clientSubscribeTo serv tag
-- | process the pending fetch jobs of delivered post IDs: Delivered posts are tried to be fetched from their URI-ID -- | process the pending fetch jobs of delivered post IDs: Delivered posts are tried to be fetched from their URI-ID