Compare commits

..

No commits in common. "1b5fc039b308cee299077f16374c25bbc57aa60c" and "3bd4cb667db2ee4c94c2537e958cfd29e6819f72" have entirely different histories.

5 changed files with 90 additions and 118 deletions

View file

@ -47,7 +47,7 @@ extra-source-files: CHANGELOG.md
common deps common deps
build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays build-depends: base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays
ghc-options: -Wall -Wpartial-fields ghc-options: -Wall

View file

@ -1,16 +1,16 @@
{-# LANGUAGE OverloadedStrings #-} {-# LANGUAGE OverloadedStrings #-}
module Main where module Main where
import Control.Concurrent import System.Random
import Control.Monad (forM_) import Control.Concurrent
import Control.Monad.IO.Class import Control.Monad (forM_)
import Control.Monad.State.Class import Control.Monad.State.Class
import Control.Monad.State.Strict (evalStateT) import Control.Monad.State.Strict (evalStateT)
import qualified Network.HTTP.Client as HTTP import Control.Monad.IO.Class
import System.Random import qualified Network.HTTP.Client as HTTP
import Hash2Pub.PostService (Hashtag, clientPublishPost) import Hash2Pub.PostService (clientPublishPost, Hashtag)
-- placeholder post data definition -- placeholder post data definition

View file

@ -49,11 +49,9 @@ import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar import Control.Concurrent.STM.TVar
import Control.Exception import Control.Exception
import Control.Monad (foldM, forM, forM_, void, when) import Control.Monad (foldM, forM, forM_, void, when)
import Control.Monad.Except (MonadError (..), runExceptT)
import Control.Monad.IO.Class (MonadIO (..))
import qualified Data.ByteString as BS import qualified Data.ByteString as BS
import Data.Either (rights) import Data.Either (rights)
import Data.Foldable (foldl', foldr', foldrM) import Data.Foldable (foldl', foldr')
import Data.Functor.Identity import Data.Functor.Identity
import Data.IP (IPv6, fromHostAddress6, import Data.IP (IPv6, fromHostAddress6,
toHostAddress6) toHostAddress6)
@ -516,28 +514,28 @@ requestJoin toJoinOn ownStateSTM = do
([], Set.empty, Set.empty) ([], Set.empty, Set.empty)
responses responses
-- sort, slice and set the accumulated successors and predecessors -- sort, slice and set the accumulated successors and predecessors
-- the contacted node itself is a successor as well and, with few newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap
-- nodes, can be a predecessor as well
newState = setSuccessors (toRemoteNodeState toJoinOn:Set.elems succAccSet) . setPredecessors (toRemoteNodeState toJoinOn:Set.elems predAccSet) $ stateSnap
writeTVar ownStateSTM newState writeTVar ownStateSTM newState
pure (cacheInsertQ, newState) pure (cacheInsertQ, newState)
-- execute the cache insertions -- execute the cache insertions
mapM_ (\f -> f joinedState) cacheInsertQ mapM_ (\f -> f joinedState) cacheInsertQ
if responses == Set.empty if responses == Set.empty
then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
else do else if null (predecessors joinedState) && null (successors joinedState)
-- wait for migration data to be completely received then pure $ Left "join error: no predecessors or successors"
waitForMigrationFrom (nodeService prn) (getNid toJoinOn) -- successful join
pure $ Right ownStateSTM else do
-- wait for migration data to be completely received
waitForMigrationFrom (nodeService prn) (getNid ownState)
pure $ Right ownStateSTM
) )
`catch` (\e -> pure . Left $ displayException (e :: IOException)) `catch` (\e -> pure . Left $ displayException (e :: IOException))
-- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID. -- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID.
requestQueryID :: (MonadIO m, MonadError String m) requestQueryID :: LocalNodeState s -- ^ NodeState of the querying node
=> LocalNodeState s -- ^ NodeState of the querying node
-> NodeID -- ^ target key ID to look up -> NodeID -- ^ target key ID to look up
-> m RemoteNodeState -- ^ the node responsible for handling that key -> IO RemoteNodeState -- ^ the node responsible for handling that key
-- 1. do a local lookup for the l closest nodes -- 1. do a local lookup for the l closest nodes
-- 2. create l sockets -- 2. create l sockets
-- 3. send a message async concurrently to all l nodes -- 3. send a message async concurrently to all l nodes
@ -545,23 +543,23 @@ requestQueryID :: (MonadIO m, MonadError String m)
-- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results) -- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results)
-- TODO: deal with lookup failures -- TODO: deal with lookup failures
requestQueryID ns targetID = do requestQueryID ns targetID = do
firstCacheSnapshot <- liftIO . readTVarIO . nodeCacheSTM $ ns firstCacheSnapshot <- readTVarIO . nodeCacheSTM $ ns
-- TODO: make maxAttempts configurable -- TODO: make maxAttempts configurable
queryIdLookupLoop firstCacheSnapshot ns 50 targetID queryIdLookupLoop firstCacheSnapshot ns 50 targetID
-- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining -- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining
queryIdLookupLoop :: (MonadIO m, MonadError String m) => NodeCache -> LocalNodeState s -> Int -> NodeID -> m RemoteNodeState queryIdLookupLoop :: NodeCache -> LocalNodeState s -> Int -> NodeID -> IO RemoteNodeState
-- return node itself as default fallback value against infinite recursion. -- return node itself as default fallback value against infinite recursion.
-- TODO: consider using an Either instead of a default value -- TODO: consider using an Either instead of a default value
queryIdLookupLoop _ ns 0 _ = throwError "exhausted maximum lookup attempts" queryIdLookupLoop _ ns 0 _ = pure $ toRemoteNodeState ns
queryIdLookupLoop cacheSnapshot ns maxAttempts targetID = do queryIdLookupLoop cacheSnapshot ns maxAttempts targetID = do
let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID
-- FOUND can only be returned if targetID is owned by local node -- FOUND can only be returned if targetID is owned by local node
case localResult of case localResult of
FOUND thisNode -> pure thisNode FOUND thisNode -> pure thisNode
FORWARD nodeSet -> do FORWARD nodeSet -> do
responseEntries <- liftIO $ sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet) responseEntries <- sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet)
now <- liftIO getPOSIXTime now <- getPOSIXTime
-- check for a FOUND and return it -- check for a FOUND and return it
case responseEntries of case responseEntries of
FOUND foundNode -> pure foundNode FOUND foundNode -> pure foundNode
@ -595,10 +593,8 @@ sendQueryIdMessages targetID ns lParam targets = do
-- insert new cache entries both into global cache as well as return accumulated QueryResponses for further processing -- insert new cache entries both into global cache as well as return accumulated QueryResponses for further processing
now <- getPOSIXTime now <- getPOSIXTime
-- collect cache entries from all responses -- collect cache entries from all responses
foldrM (\resp acc -> do foldM (\acc resp -> do
let let entrySet = case queryResult <$> payload resp of
responseResult = queryResult <$> payload resp
entrySet = case responseResult of
Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now) Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now)
Just (FORWARD resultset) -> resultset Just (FORWARD resultset) -> resultset
_ -> Set.empty _ -> Set.empty
@ -608,15 +604,10 @@ sendQueryIdMessages targetID ns lParam targets = do
-- return accumulated QueryResult -- return accumulated QueryResult
pure $ case acc of pure $ case acc of
-- once a FOUND as been encountered, return this as a result -- once a FOUND as been encountered, return this as a result
FOUND{} -> acc isFound@FOUND{} -> isFound
FORWARD accSet FORWARD accSet -> FORWARD $ entrySet `Set.union` accSet
| maybe False isFound responseResult -> fromJust responseResult
| otherwise -> FORWARD $ entrySet `Set.union` accSet
) (FORWARD Set.empty) responses ) (FORWARD Set.empty) responses
where
isFound FOUND{} = True
isFound _ = False
-- | Create a QueryID message to be supplied to 'sendRequestTo' -- | Create a QueryID message to be supplied to 'sendRequestTo'
lookupMessage :: Integral i lookupMessage :: Integral i
@ -658,7 +649,8 @@ requestStabilise ns neighbour = do
) )
([],[]) respSet ([],[]) respSet
-- update successfully responded neighbour in cache -- update successfully responded neighbour in cache
maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet) now <- getPOSIXTime
maybe (pure ()) (\p -> queueAddEntries (Identity $ RemoteCacheEntry (sender p) now) ns) $ headMay (Set.elems respSet)
pure $ if null responsePreds && null responseSuccs pure $ if null responsePreds && null responseSuccs
then Left "no neighbours returned" then Left "no neighbours returned"
else Right (responsePreds, responseSuccs) else Right (responsePreds, responseSuccs)
@ -774,14 +766,14 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
-> IO () -> IO ()
sendAndAck responseQueue sock' remainingSends = do sendAndAck responseQueue sock' remainingSends = do
sendMany sock' $ Map.elems remainingSends sendMany sock $ Map.elems remainingSends
-- if all requests have been acked/ responded to, return prematurely -- if all requests have been acked/ responded to, return prematurely
recvLoop sock' responseQueue remainingSends Set.empty Nothing recvLoop sock' responseQueue remainingSends Set.empty Nothing
recvLoop :: Socket recvLoop :: Socket
-> TBQueue FediChordMessage -- ^ the queue for putting in the received responses -> TBQueue FediChordMessage -- ^ the queue for putting in the received responses
-> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts -> Map.Map Integer BS.ByteString -- ^ the remaining unacked request parts
-> Set.Set Integer -- ^ already received response part numbers -> Set.Set Integer -- ^ already received response part numbers
-> Maybe Integer -- ^ total number of response parts if already known -> Maybe Integer -- ^ total number of response parts if already known
-> IO () -> IO ()
recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts = do recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts = do
-- 65535 is maximum length of UDP packets, as long as -- 65535 is maximum length of UDP packets, as long as
@ -794,11 +786,10 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do
newTotalParts = if isFinalPart msg then Just (part msg) else totalParts newTotalParts = if isFinalPart msg then Just (part msg) else totalParts
newRemaining = Map.delete (part msg) remainingSends' newRemaining = Map.delete (part msg) remainingSends'
newReceivedParts = Set.insert (part msg) receivedPartNums newReceivedParts = Set.insert (part msg) receivedPartNums
if Map.null newRemaining && maybe False (\p -> Set.size newReceivedParts == fromIntegral p) newTotalParts if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts
then pure () then pure ()
else recvLoop sock' responseQueue newRemaining newReceivedParts newTotalParts else recvLoop sock' responseQueue newRemaining receivedPartNums newTotalParts
-- drop errors and invalid messages -- drop errors and invalid messages
Right Request{} -> pure () -- expecting a response, not a request
Left _ -> recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts Left _ -> recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts
@ -825,18 +816,6 @@ queueDeleteEntry :: NodeID
-> IO () -> IO ()
queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete
-- | enqueue the timestamp update and verification marking of an entry in the
-- global 'NodeCache'.
queueUpdateVerifieds :: Foldable c
=> c NodeID
-> LocalNodeState s
-> IO ()
queueUpdateVerifieds nIds ns = do
now <- getPOSIXTime
forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $
markCacheEntryAsVerified (Just now) nid'
-- | retry an IO action at most *i* times until it delivers a result -- | retry an IO action at most *i* times until it delivers a result
attempts :: Int -- ^ number of retries *i* attempts :: Int -- ^ number of retries *i*
-> IO (Maybe a) -- ^ action to retry -> IO (Maybe a) -- ^ action to retry

View file

@ -166,7 +166,6 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do
currentlyResponsible <- liftEither lookupResp currentlyResponsible <- liftEither lookupResp
liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
-- 2. then send a join to the currently responsible node -- 2. then send a join to the currently responsible node
liftIO $ putStrLn "send a bootstrap Join"
joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM
liftEither joinResult liftEither joinResult
@ -245,24 +244,26 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset
) )
initCache resp initCache resp
currentlyResponsible <- runExceptT $ queryIdLookupLoop bootstrapCache ns 50 $ getNid ns currentlyResponsible <- queryIdLookupLoop bootstrapCache ns 50 $ getNid ns
pure currentlyResponsible pure $ Right currentlyResponsible
-- | join a node to the DHT using the global node cache -- | join a node to the DHT using the global node cache
-- node's position. -- node's position.
fediChordVserverJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) fediChordVserverJoin :: Service s (RealNodeSTM s)
=> LocalNodeStateSTM s -- ^ the local 'NodeState' => LocalNodeStateSTM s -- ^ the local 'NodeState'
-> m (LocalNodeStateSTM s) -- ^ the joined 'NodeState' after a -> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a
-- successful join, otherwise an error message -- successful join, otherwise an error message
fediChordVserverJoin nsSTM = do fediChordVserverJoin nsSTM = do
ns <- liftIO $ readTVarIO nsSTM ns <- readTVarIO nsSTM
-- 1. get routed to the currently responsible node -- 1. get routed to the currently responsible node
currentlyResponsible <- requestQueryID ns $ getNid ns currentlyResponsible <- requestQueryID ns $ getNid ns
liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible)
-- 2. then send a join to the currently responsible node -- 2. then send a join to the currently responsible node
joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM joinResult <- requestJoin currentlyResponsible nsSTM
liftEither joinResult case joinResult of
Left err -> pure . Left $ "Error joining on " <> err
Right joinedNS -> pure . Right $ joinedNS
fediChordVserverLeave :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeState s -> m () fediChordVserverLeave :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeState s -> m ()
fediChordVserverLeave ns = do fediChordVserverLeave ns = do
@ -322,7 +323,7 @@ joinOnNewEntriesThread nsSTM = loop
pure () pure ()
-- otherwise try joining -- otherwise try joining
FORWARD _ -> do FORWARD _ -> do
joinResult <- runExceptT $ fediChordVserverJoin nsSTM joinResult <- fediChordVserverJoin nsSTM
either either
-- on join failure, sleep and retry -- on join failure, sleep and retry
-- TODO: make delay configurable -- TODO: make delay configurable
@ -503,26 +504,18 @@ stabiliseThread nsSTM = forever $ do
-- try looking up additional neighbours if list too short -- try looking up additional neighbours if list too short
forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
ns' <- readTVarIO nsSTM ns' <- readTVarIO nsSTM
nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') nextEntry <- requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns')
either atomically $ do
(const $ pure ()) latestNs <- readTVar nsSTM
(\entry -> atomically $ do writeTVar nsSTM $ addPredecessors [nextEntry] latestNs
latestNs <- readTVar nsSTM
writeTVar nsSTM $ addPredecessors [entry] latestNs
)
nextEntry
) )
forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do
ns' <- readTVarIO nsSTM ns' <- readTVarIO nsSTM
nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') nextEntry <- requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns')
either atomically $ do
(const $ pure ()) latestNs <- readTVar nsSTM
(\entry -> atomically $ do writeTVar nsSTM $ addSuccessors [nextEntry] latestNs
latestNs <- readTVar nsSTM
writeTVar nsSTM $ addSuccessors [entry] latestNs
)
nextEntry
) )
newNs <- readTVarIO nsSTM newNs <- readTVarIO nsSTM
@ -645,7 +638,7 @@ requestMapPurge :: MVar RequestMap -> IO ()
requestMapPurge mapVar = forever $ do requestMapPurge mapVar = forever $ do
rMapState <- takeMVar mapVar rMapState <- takeMVar mapVar
now <- getPOSIXTime now <- getPOSIXTime
putMVar mapVar $ Map.filter (\(RequestMapEntry _ _ ts) -> putMVar mapVar $ Map.filter (\entry@(RequestMapEntry _ _ ts) ->
now - ts < responsePurgeAge now - ts < responsePurgeAge
) rMapState ) rMapState
threadDelay $ round responsePurgeAge * 2 * 10^6 threadDelay $ round responsePurgeAge * 2 * 10^6
@ -764,7 +757,7 @@ getKeyResponsibility nodeSTM lookupKey = do
-- new entry. -- new entry.
-- If no vserver is active in the DHT, 'Nothing' is returned. -- If no vserver is active in the DHT, 'Nothing' is returned.
updateLookupCache :: RealNodeSTM s -> NodeID -> IO (Maybe (String, PortNumber)) updateLookupCache :: RealNodeSTM s -> NodeID -> IO (Maybe (String, PortNumber))
updateLookupCache nodeSTM keyToLookup = do updateLookupCache nodeSTM lookupKey = do
(node, lookupSource) <- atomically $ do (node, lookupSource) <- atomically $ do
node <- readTVar nodeSTM node <- readTVar nodeSTM
let firstVs = headMay (vservers node) let firstVs = headMay (vservers node)
@ -774,25 +767,18 @@ updateLookupCache nodeSTM keyToLookup = do
pure (node, lookupSource) pure (node, lookupSource)
maybe (do maybe (do
-- if no local node available, delete cache entry and return Nothing -- if no local node available, delete cache entry and return Nothing
atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete keyToLookup atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete lookupKey
pure Nothing pure Nothing
) )
(\n -> do (\n -> do
-- start a lookup from the node, update the cache with the lookup result and return it -- start a lookup from the node, update the cache with the lookup result and return it
-- TODO: better retry management, because having no vserver joined yet should newResponsible <- requestQueryID n lookupKey
-- be treated differently than other reasons for not getting a result. let newEntry = (getDomain newResponsible, getServicePort newResponsible)
newResponsible <- runExceptT $ requestQueryID n keyToLookup now <- getPOSIXTime
either -- atomic update against lost updates
(const $ pure Nothing) atomically $ modifyTVar' (lookupCacheSTM node) $
(\result -> do Map.insert lookupKey (CacheEntry False newEntry now)
let newEntry = (getDomain result, getServicePort result) pure $ Just newEntry
now <- getPOSIXTime
-- atomic update against lost updates
atomically $ modifyTVar' (lookupCacheSTM node) $
Map.insert keyToLookup (CacheEntry False newEntry now)
pure $ Just newEntry
)
newResponsible
) lookupSource ) lookupSource

View file

@ -10,30 +10,37 @@ module Hash2Pub.PostService where
import Control.Concurrent import Control.Concurrent
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Concurrent.MVar
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Exception (Exception (..), try) import Control.Concurrent.STM.TChan
import Control.Monad (foldM, forM, forM_, forever, void, import Control.Concurrent.STM.TChan
when) import Control.Concurrent.STM.TQueue
import Control.Monad.IO.Class (liftIO) import Control.Concurrent.STM.TVar
import Control.Exception (Exception (..), try)
import Control.Monad (foldM, forM, forM_, forever, when, void)
import Control.Monad.IO.Class (liftIO)
import Control.Monad.STM
import Data.Bifunctor import Data.Bifunctor
import qualified Data.ByteString.Lazy.UTF8 as BSUL import qualified Data.ByteString.Lazy.UTF8 as BSUL
import qualified Data.ByteString.UTF8 as BSU import qualified Data.ByteString.UTF8 as BSU
import qualified Data.HashMap.Strict as HMap import qualified Data.HashMap.Strict as HMap
import qualified Data.HashSet as HSet import qualified Data.HashSet as HSet
import Data.Maybe (fromMaybe, isJust) import Data.Maybe (fromMaybe, isJust)
import Data.String (fromString) import Data.String (fromString)
import qualified Data.Text.Lazy as Txt import qualified Data.Text.Lazy as Txt
import Data.Text.Normalize (NormalizationMode (NFC), normalize) import Data.Text.Normalize (NormalizationMode (NFC),
normalize)
import Data.Time.Clock.POSIX import Data.Time.Clock.POSIX
import Data.Typeable (Typeable) import Data.Typeable (Typeable)
import qualified Network.HTTP.Client as HTTP import qualified Network.HTTP.Client as HTTP
import qualified Network.HTTP.Types as HTTPT import qualified Network.HTTP.Types as HTTPT
import System.Random import System.Random
import Text.Read (readEither) import Text.Read (readEither)
import qualified Network.Wai.Handler.Warp as Warp import qualified Network.Wai.Handler.Warp as Warp
import Servant import Servant
import Servant.Client import Servant.Client
import Servant.Server
import Hash2Pub.FediChordTypes import Hash2Pub.FediChordTypes
import Hash2Pub.RingMap import Hash2Pub.RingMap
@ -348,7 +355,7 @@ clientDeliverSubscriptions :: PostService d
-> (String, Int) -- ^ hostname and port of instance to deliver to -> (String, Int) -- ^ hostname and port of instance to deliver to
-> IO (Either String ()) -- Either signals success or failure -> IO (Either String ()) -- Either signals success or failure
clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do
-- collect tag interval -- collect tag intearval
intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv) intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv)
-- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ] -- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ]
-- extract subscribers and posts -- extract subscribers and posts