Compare commits
	
		
			No commits in common. "1b5fc039b308cee299077f16374c25bbc57aa60c" and "3bd4cb667db2ee4c94c2537e958cfd29e6819f72" have entirely different histories.
		
	
	
		
			1b5fc039b3
			...
			3bd4cb667d
		
	
		
					 5 changed files with 90 additions and 118 deletions
				
			
		|  | @ -47,7 +47,7 @@ extra-source-files:  CHANGELOG.md | |||
| 
 | ||||
| common deps | ||||
|   build-depends:       base ^>=4.12.0.0, containers ^>=0.6.0.1, bytestring, utf8-string ^>=1.0.1.1, network ^>=2.8.0.1, time ^>=1.8.0.2, cmdargs ^>= 0.10, cryptonite ^>= 0.25, memory, async, stm, asn1-encoding, asn1-types, asn1-parse, publicsuffix, network-byte-order, safe, iproute, mtl, random, servant, servant-server, servant-client, warp, text, unordered-containers, hashable, unicode-transforms, http-client, http-types, unbounded-delays | ||||
|   ghc-options:         -Wall -Wpartial-fields | ||||
|   ghc-options:         -Wall | ||||
| 
 | ||||
| 
 | ||||
| 
 | ||||
|  |  | |||
|  | @ -1,16 +1,16 @@ | |||
| {-# LANGUAGE OverloadedStrings #-} | ||||
| {-# LANGUAGE OverloadedStrings     #-} | ||||
| 
 | ||||
| module Main where | ||||
| 
 | ||||
| import           Control.Concurrent | ||||
| import           Control.Monad              (forM_) | ||||
| import           Control.Monad.IO.Class | ||||
| import           Control.Monad.State.Class | ||||
| import           Control.Monad.State.Strict (evalStateT) | ||||
| import qualified Network.HTTP.Client        as HTTP | ||||
| import           System.Random | ||||
| import System.Random | ||||
| import Control.Concurrent | ||||
| import Control.Monad (forM_) | ||||
| import Control.Monad.State.Class | ||||
| import Control.Monad.State.Strict (evalStateT) | ||||
| import Control.Monad.IO.Class | ||||
| import qualified Network.HTTP.Client           as HTTP | ||||
| 
 | ||||
| import           Hash2Pub.PostService       (Hashtag, clientPublishPost) | ||||
| import Hash2Pub.PostService     (clientPublishPost, Hashtag) | ||||
| 
 | ||||
| -- placeholder post data definition | ||||
| 
 | ||||
|  |  | |||
|  | @ -49,11 +49,9 @@ import           Control.Concurrent.STM.TQueue | |||
| import           Control.Concurrent.STM.TVar | ||||
| import           Control.Exception | ||||
| import           Control.Monad                  (foldM, forM, forM_, void, when) | ||||
| import           Control.Monad.Except           (MonadError (..), runExceptT) | ||||
| import           Control.Monad.IO.Class         (MonadIO (..)) | ||||
| import qualified Data.ByteString                as BS | ||||
| import           Data.Either                    (rights) | ||||
| import           Data.Foldable                  (foldl', foldr', foldrM) | ||||
| import           Data.Foldable                  (foldl', foldr') | ||||
| import           Data.Functor.Identity | ||||
| import           Data.IP                        (IPv6, fromHostAddress6, | ||||
|                                                  toHostAddress6) | ||||
|  | @ -516,28 +514,28 @@ requestJoin toJoinOn ownStateSTM = do | |||
|                     ([], Set.empty, Set.empty) | ||||
|                     responses | ||||
|                 -- sort, slice and set the accumulated successors and predecessors | ||||
|                 -- the contacted node itself is a successor as well and, with few | ||||
|                 -- nodes, can be a predecessor as well | ||||
|                 newState = setSuccessors (toRemoteNodeState toJoinOn:Set.elems succAccSet) . setPredecessors (toRemoteNodeState toJoinOn:Set.elems predAccSet) $ stateSnap | ||||
|                 newState = setSuccessors (Set.elems succAccSet) . setPredecessors (Set.elems predAccSet) $ stateSnap | ||||
|             writeTVar ownStateSTM newState | ||||
|             pure (cacheInsertQ, newState) | ||||
|         -- execute the cache insertions | ||||
|         mapM_ (\f -> f joinedState) cacheInsertQ | ||||
|         if responses == Set.empty | ||||
|                   then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn) | ||||
|            else do | ||||
|                 -- wait for migration data to be completely received | ||||
|                 waitForMigrationFrom (nodeService prn) (getNid toJoinOn) | ||||
|                 pure $ Right ownStateSTM | ||||
|            else if null (predecessors joinedState) && null (successors joinedState) | ||||
|                 then pure $ Left "join error: no predecessors or successors" | ||||
|                 -- successful join | ||||
|                 else do | ||||
|                     -- wait for migration data to be completely received | ||||
|                     waitForMigrationFrom (nodeService prn) (getNid ownState) | ||||
|                     pure $ Right ownStateSTM | ||||
|                                    ) | ||||
|         `catch` (\e -> pure . Left $ displayException (e :: IOException)) | ||||
| 
 | ||||
| 
 | ||||
| -- | Send a 'QueryID' 'Request' for getting the node that handles a certain key ID. | ||||
| requestQueryID :: (MonadIO m, MonadError String m) | ||||
|                => LocalNodeState s        -- ^ NodeState of the querying node | ||||
| requestQueryID :: LocalNodeState s        -- ^ NodeState of the querying node | ||||
|                -> NodeID                -- ^ target key ID to look up | ||||
|                -> m RemoteNodeState    -- ^ the node responsible for handling that key | ||||
|                -> IO RemoteNodeState    -- ^ the node responsible for handling that key | ||||
| -- 1. do a local lookup for the l closest nodes | ||||
| -- 2. create l sockets | ||||
| -- 3. send a message async concurrently to all l nodes | ||||
|  | @ -545,23 +543,23 @@ requestQueryID :: (MonadIO m, MonadError String m) | |||
| -- 5. repeat until FOUND (problem: new entries not necessarily already in cache, explicitly compare with closer results) | ||||
| -- TODO: deal with lookup failures | ||||
| requestQueryID ns targetID = do | ||||
|     firstCacheSnapshot <- liftIO . readTVarIO . nodeCacheSTM $ ns | ||||
|     firstCacheSnapshot <- readTVarIO . nodeCacheSTM $ ns | ||||
|     -- TODO: make maxAttempts configurable | ||||
|     queryIdLookupLoop firstCacheSnapshot ns 50 targetID | ||||
| 
 | ||||
| -- | like 'requestQueryID, but allows passing of a custom cache, e.g. for joining | ||||
| queryIdLookupLoop :: (MonadIO m, MonadError String m) => NodeCache -> LocalNodeState s -> Int -> NodeID -> m RemoteNodeState | ||||
| queryIdLookupLoop :: NodeCache -> LocalNodeState s -> Int -> NodeID -> IO RemoteNodeState | ||||
| -- return node itself as default fallback value against infinite recursion. | ||||
| -- TODO: consider using an Either instead of a default value | ||||
| queryIdLookupLoop _ ns 0 _ = throwError "exhausted maximum lookup attempts" | ||||
| queryIdLookupLoop _ ns 0 _ = pure $ toRemoteNodeState ns | ||||
| queryIdLookupLoop cacheSnapshot ns maxAttempts targetID = do | ||||
|     let localResult = queryLocalCache ns cacheSnapshot (lNumBestNodes ns) targetID | ||||
|     -- FOUND can only be returned if targetID is owned by local node | ||||
|     case localResult of | ||||
|       FOUND thisNode -> pure thisNode | ||||
|       FORWARD nodeSet -> do | ||||
|           responseEntries <- liftIO $ sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet) | ||||
|           now <- liftIO getPOSIXTime | ||||
|           responseEntries <- sendQueryIdMessages targetID ns Nothing (remoteNode <$> Set.elems nodeSet) | ||||
|           now <- getPOSIXTime | ||||
|           -- check for a FOUND and return it | ||||
|           case responseEntries of | ||||
|             FOUND foundNode -> pure foundNode | ||||
|  | @ -595,10 +593,8 @@ sendQueryIdMessages targetID ns lParam targets = do | |||
|           -- insert new cache entries both into global cache as well as return accumulated QueryResponses for further processing | ||||
|           now <- getPOSIXTime | ||||
|           -- collect cache entries from all responses | ||||
|           foldrM (\resp acc -> do | ||||
|             let | ||||
|                 responseResult = queryResult <$> payload resp | ||||
|                 entrySet = case responseResult of | ||||
|           foldM (\acc resp -> do | ||||
|             let entrySet = case queryResult <$> payload resp of | ||||
|                              Just (FOUND result1) -> Set.singleton (RemoteCacheEntry result1 now) | ||||
|                              Just (FORWARD resultset) -> resultset | ||||
|                              _ -> Set.empty | ||||
|  | @ -608,15 +604,10 @@ sendQueryIdMessages targetID ns lParam targets = do | |||
|             -- return accumulated QueryResult | ||||
|             pure $ case acc of | ||||
|               -- once a FOUND as been encountered, return this as a result | ||||
|               FOUND{} -> acc | ||||
|               FORWARD accSet | ||||
|                 | maybe False isFound responseResult -> fromJust responseResult | ||||
|                 | otherwise -> FORWARD $ entrySet `Set.union` accSet | ||||
|               isFound@FOUND{} -> isFound | ||||
|               FORWARD accSet  -> FORWARD $ entrySet `Set.union` accSet | ||||
| 
 | ||||
|                             ) (FORWARD Set.empty) responses | ||||
|   where | ||||
|       isFound FOUND{} = True | ||||
|       isFound _       = False | ||||
| 
 | ||||
| -- | Create a QueryID message to be supplied to 'sendRequestTo' | ||||
| lookupMessage :: Integral i | ||||
|  | @ -658,7 +649,8 @@ requestStabilise ns neighbour = do | |||
|                                                       ) | ||||
|                                                       ([],[]) respSet | ||||
|             -- update successfully responded neighbour in cache | ||||
|             maybe (pure ()) (\p -> queueUpdateVerifieds (Identity $ senderID p) ns) $ headMay (Set.elems respSet) | ||||
|             now <- getPOSIXTime | ||||
|             maybe (pure ()) (\p -> queueAddEntries (Identity $ RemoteCacheEntry (sender p) now) ns) $ headMay (Set.elems respSet) | ||||
|             pure $ if null responsePreds && null responseSuccs | ||||
|                       then Left "no neighbours returned" | ||||
|                       else Right (responsePreds, responseSuccs) | ||||
|  | @ -774,14 +766,14 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do | |||
|                -> Map.Map Integer BS.ByteString     -- ^ the remaining unacked request parts | ||||
|                -> IO () | ||||
|     sendAndAck responseQueue sock' remainingSends = do | ||||
|         sendMany sock' $ Map.elems remainingSends | ||||
|         sendMany sock $ Map.elems remainingSends | ||||
|         -- if all requests have been acked/ responded to, return prematurely | ||||
|         recvLoop sock' responseQueue remainingSends Set.empty Nothing | ||||
|     recvLoop :: Socket | ||||
|              -> TBQueue FediChordMessage          -- ^ the queue for putting in the received responses | ||||
|              -> Map.Map Integer BS.ByteString       -- ^ the remaining unacked request parts | ||||
|              -> Set.Set Integer                     -- ^ already received response part numbers | ||||
|              -> Maybe Integer                     -- ^ total number of response parts if already known | ||||
|                -> Maybe Integer                     -- ^ total number of response parts if already known | ||||
|              -> IO () | ||||
|     recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts = do | ||||
|         -- 65535 is maximum length of UDP packets, as long as | ||||
|  | @ -794,11 +786,10 @@ sendRequestToWithParams timeoutMillis numAttempts msgIncomplete sock = do | |||
|                 newTotalParts = if isFinalPart msg then Just (part msg) else totalParts | ||||
|                 newRemaining = Map.delete (part msg) remainingSends' | ||||
|                 newReceivedParts = Set.insert (part msg) receivedPartNums | ||||
|               if  Map.null newRemaining && maybe False (\p -> Set.size newReceivedParts == fromIntegral p) newTotalParts | ||||
|               if Map.null newRemaining && maybe False (\p -> Set.size receivedPartNums == fromIntegral p) newTotalParts | ||||
|                  then pure () | ||||
|                  else recvLoop sock' responseQueue newRemaining newReceivedParts newTotalParts | ||||
|                  else recvLoop sock' responseQueue newRemaining receivedPartNums newTotalParts | ||||
|           -- drop errors and invalid messages | ||||
|           Right Request{} -> pure ()    -- expecting a response, not a request | ||||
|           Left _ -> recvLoop sock' responseQueue remainingSends' receivedPartNums totalParts | ||||
| 
 | ||||
| 
 | ||||
|  | @ -825,18 +816,6 @@ queueDeleteEntry :: NodeID | |||
|                  -> IO () | ||||
| queueDeleteEntry toDelete = queueDeleteEntries $ Identity toDelete | ||||
| 
 | ||||
| 
 | ||||
| -- | enqueue the timestamp update and verification marking of an entry in the | ||||
| -- global 'NodeCache'. | ||||
| queueUpdateVerifieds :: Foldable c | ||||
|                      => c NodeID | ||||
|                      -> LocalNodeState s | ||||
|                      -> IO () | ||||
| queueUpdateVerifieds nIds ns = do | ||||
|     now <- getPOSIXTime | ||||
|     forM_ nIds $ \nid' -> atomically $ writeTQueue (cacheWriteQueue ns) $ | ||||
|         markCacheEntryAsVerified (Just now) nid' | ||||
| 
 | ||||
| -- | retry an IO action at most *i* times until it delivers a result | ||||
| attempts :: Int             -- ^ number of retries *i* | ||||
|          -> IO (Maybe a)    -- ^ action to retry | ||||
|  |  | |||
|  | @ -166,7 +166,6 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do | |||
|         currentlyResponsible <- liftEither lookupResp | ||||
|         liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) | ||||
|         -- 2. then send a join to the currently responsible node | ||||
|         liftIO $ putStrLn "send a bootstrap Join" | ||||
|         joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM | ||||
|         liftEither joinResult | ||||
| 
 | ||||
|  | @ -245,24 +244,26 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do | |||
|                            Just (FORWARD resultset) -> foldr' (addCacheEntryPure now) cacheAcc resultset | ||||
|                               ) | ||||
|                            initCache resp | ||||
|                currentlyResponsible <- runExceptT $ queryIdLookupLoop bootstrapCache ns 50 $ getNid ns | ||||
|                pure currentlyResponsible | ||||
|                currentlyResponsible <- queryIdLookupLoop bootstrapCache ns 50 $ getNid ns | ||||
|                pure $ Right currentlyResponsible | ||||
| 
 | ||||
| 
 | ||||
| -- | join a node to the DHT using the global node cache | ||||
| -- node's position. | ||||
| fediChordVserverJoin :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) | ||||
| fediChordVserverJoin :: Service s (RealNodeSTM s) | ||||
|                      => LocalNodeStateSTM s                    -- ^ the local 'NodeState' | ||||
|                      -> m (LocalNodeStateSTM s)  -- ^ the joined 'NodeState' after a | ||||
|                      -> IO (Either String (LocalNodeStateSTM s))  -- ^ the joined 'NodeState' after a | ||||
|                                                     -- successful join, otherwise an error message | ||||
| fediChordVserverJoin nsSTM = do | ||||
|     ns <- liftIO $ readTVarIO nsSTM | ||||
|     ns <- readTVarIO nsSTM | ||||
|     -- 1. get routed to the currently responsible node | ||||
|     currentlyResponsible <- requestQueryID ns $ getNid ns | ||||
|     liftIO . putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) | ||||
|     putStrLn $ "Trying to join on " <> show (getNid currentlyResponsible) | ||||
|     -- 2. then send a join to the currently responsible node | ||||
|     joinResult <- liftIO $ requestJoin currentlyResponsible nsSTM | ||||
|     liftEither joinResult | ||||
|     joinResult <- requestJoin currentlyResponsible nsSTM | ||||
|     case joinResult of | ||||
|       Left err       -> pure . Left $ "Error joining on " <> err | ||||
|       Right joinedNS -> pure . Right $ joinedNS | ||||
| 
 | ||||
| fediChordVserverLeave :: (MonadError String m, MonadIO m, Service s (RealNodeSTM s)) => LocalNodeState s -> m () | ||||
| fediChordVserverLeave ns = do | ||||
|  | @ -322,7 +323,7 @@ joinOnNewEntriesThread nsSTM = loop | |||
|               pure () | ||||
|           -- otherwise try joining | ||||
|           FORWARD _ -> do | ||||
|               joinResult <- runExceptT $ fediChordVserverJoin nsSTM | ||||
|               joinResult <- fediChordVserverJoin nsSTM | ||||
|               either | ||||
|                 -- on join failure, sleep and retry | ||||
|                 -- TODO: make delay configurable | ||||
|  | @ -503,26 +504,18 @@ stabiliseThread nsSTM = forever $ do | |||
|     -- try looking up additional neighbours if list too short | ||||
|     forM_ [(length $ predecessors updatedNs)..(kNeighbours updatedNs)] (\_ -> do | ||||
|         ns' <- readTVarIO nsSTM | ||||
|         nextEntry <- runExceptT . requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') | ||||
|         either | ||||
|             (const $ pure ()) | ||||
|             (\entry -> atomically $ do | ||||
|                 latestNs <- readTVar nsSTM | ||||
|                 writeTVar nsSTM $ addPredecessors [entry] latestNs | ||||
|             ) | ||||
|             nextEntry | ||||
|         nextEntry <- requestQueryID ns' $ pred . getNid $ lastDef (toRemoteNodeState ns') (predecessors ns') | ||||
|         atomically $ do | ||||
|             latestNs <- readTVar nsSTM | ||||
|             writeTVar nsSTM $ addPredecessors [nextEntry] latestNs | ||||
|                                                                        ) | ||||
| 
 | ||||
|     forM_ [(length $ successors updatedNs)..(kNeighbours updatedNs)] (\_ -> do | ||||
|         ns' <- readTVarIO nsSTM | ||||
|         nextEntry <- runExceptT . requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') | ||||
|         either | ||||
|             (const $ pure ()) | ||||
|             (\entry -> atomically $ do | ||||
|                 latestNs <- readTVar nsSTM | ||||
|                 writeTVar nsSTM $ addSuccessors [entry] latestNs | ||||
|             ) | ||||
|             nextEntry | ||||
|         nextEntry <- requestQueryID ns' $ succ . getNid $ lastDef (toRemoteNodeState ns') (successors ns') | ||||
|         atomically $ do | ||||
|             latestNs <- readTVar nsSTM | ||||
|             writeTVar nsSTM $ addSuccessors [nextEntry] latestNs | ||||
|                                                                      ) | ||||
| 
 | ||||
|     newNs <- readTVarIO nsSTM | ||||
|  | @ -645,7 +638,7 @@ requestMapPurge :: MVar RequestMap -> IO () | |||
| requestMapPurge mapVar = forever $ do | ||||
|     rMapState <- takeMVar mapVar | ||||
|     now <- getPOSIXTime | ||||
|     putMVar mapVar $ Map.filter (\(RequestMapEntry _ _ ts)  -> | ||||
|     putMVar mapVar $ Map.filter (\entry@(RequestMapEntry _ _ ts)  -> | ||||
|         now - ts < responsePurgeAge | ||||
|                                 ) rMapState | ||||
|     threadDelay $ round responsePurgeAge * 2 * 10^6 | ||||
|  | @ -764,7 +757,7 @@ getKeyResponsibility nodeSTM lookupKey = do | |||
| -- new entry. | ||||
| -- If no vserver is active in the DHT, 'Nothing' is returned. | ||||
| updateLookupCache :: RealNodeSTM s -> NodeID -> IO (Maybe (String, PortNumber)) | ||||
| updateLookupCache nodeSTM keyToLookup = do | ||||
| updateLookupCache nodeSTM lookupKey = do | ||||
|     (node, lookupSource) <- atomically $ do | ||||
|         node <- readTVar nodeSTM | ||||
|         let firstVs = headMay (vservers node) | ||||
|  | @ -774,25 +767,18 @@ updateLookupCache nodeSTM keyToLookup = do | |||
|         pure (node, lookupSource) | ||||
|     maybe (do | ||||
|         -- if no local node available, delete cache entry and return Nothing | ||||
|         atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete keyToLookup | ||||
|         atomically $ modifyTVar' (lookupCacheSTM node) $ Map.delete lookupKey | ||||
|         pure Nothing | ||||
|           ) | ||||
|           (\n -> do | ||||
|         -- start a lookup from the node, update the cache with the lookup result and return it | ||||
|         -- TODO: better retry management, because having no vserver joined yet should | ||||
|         -- be treated differently than other reasons for not getting a result. | ||||
|         newResponsible <- runExceptT $ requestQueryID n keyToLookup | ||||
|         either | ||||
|             (const $ pure Nothing) | ||||
|             (\result -> do | ||||
|                 let newEntry = (getDomain result, getServicePort result) | ||||
|                 now <- getPOSIXTime | ||||
|                 -- atomic update against lost updates | ||||
|                 atomically $ modifyTVar' (lookupCacheSTM node) $ | ||||
|                     Map.insert keyToLookup (CacheEntry False newEntry now) | ||||
|                 pure $ Just newEntry | ||||
|             ) | ||||
|             newResponsible | ||||
|         newResponsible <- requestQueryID n lookupKey | ||||
|         let newEntry = (getDomain newResponsible, getServicePort newResponsible) | ||||
|         now <- getPOSIXTime | ||||
|         -- atomic update against lost updates | ||||
|         atomically $ modifyTVar' (lookupCacheSTM node) $ | ||||
|             Map.insert lookupKey (CacheEntry False newEntry now) | ||||
|         pure $ Just newEntry | ||||
|            ) lookupSource | ||||
| 
 | ||||
| 
 | ||||
|  |  | |||
|  | @ -10,30 +10,37 @@ module Hash2Pub.PostService where | |||
| 
 | ||||
| import           Control.Concurrent | ||||
| import           Control.Concurrent.Async | ||||
| import           Control.Concurrent.MVar | ||||
| import           Control.Concurrent.STM | ||||
| import           Control.Exception         (Exception (..), try) | ||||
| import           Control.Monad             (foldM, forM, forM_, forever, void, | ||||
|                                             when) | ||||
| import           Control.Monad.IO.Class    (liftIO) | ||||
| import           Control.Concurrent.STM.TChan | ||||
| import           Control.Concurrent.STM.TChan | ||||
| import           Control.Concurrent.STM.TQueue | ||||
| import           Control.Concurrent.STM.TVar | ||||
| import           Control.Exception             (Exception (..), try) | ||||
| import           Control.Monad                 (foldM, forM, forM_, forever, when, void) | ||||
| import           Control.Monad.IO.Class        (liftIO) | ||||
| import           Control.Monad.STM | ||||
| import           Data.Bifunctor | ||||
| import qualified Data.ByteString.Lazy.UTF8 as BSUL | ||||
| import qualified Data.ByteString.UTF8      as BSU | ||||
| import qualified Data.HashMap.Strict       as HMap | ||||
| import qualified Data.HashSet              as HSet | ||||
| import           Data.Maybe                (fromMaybe, isJust) | ||||
| import           Data.String               (fromString) | ||||
| import qualified Data.Text.Lazy            as Txt | ||||
| import           Data.Text.Normalize       (NormalizationMode (NFC), normalize) | ||||
| import qualified Data.ByteString.Lazy.UTF8     as BSUL | ||||
| import qualified Data.ByteString.UTF8          as BSU | ||||
| import qualified Data.HashMap.Strict           as HMap | ||||
| import qualified Data.HashSet                  as HSet | ||||
| import           Data.Maybe                    (fromMaybe, isJust) | ||||
| import           Data.String                   (fromString) | ||||
| import qualified Data.Text.Lazy                as Txt | ||||
| import           Data.Text.Normalize           (NormalizationMode (NFC), | ||||
|                                                 normalize) | ||||
| import           Data.Time.Clock.POSIX | ||||
| import           Data.Typeable             (Typeable) | ||||
| import qualified Network.HTTP.Client       as HTTP | ||||
| import qualified Network.HTTP.Types        as HTTPT | ||||
| import           Data.Typeable                 (Typeable) | ||||
| import qualified Network.HTTP.Client           as HTTP | ||||
| import qualified Network.HTTP.Types            as HTTPT | ||||
| import           System.Random | ||||
| import           Text.Read                 (readEither) | ||||
| import           Text.Read                     (readEither) | ||||
| 
 | ||||
| import qualified Network.Wai.Handler.Warp  as Warp | ||||
| import qualified Network.Wai.Handler.Warp      as Warp | ||||
| import           Servant | ||||
| import           Servant.Client | ||||
| import           Servant.Server | ||||
| 
 | ||||
| import           Hash2Pub.FediChordTypes | ||||
| import           Hash2Pub.RingMap | ||||
|  | @ -348,7 +355,7 @@ clientDeliverSubscriptions :: PostService d | |||
|                            -> (String, Int)     -- ^ hostname and port of instance to deliver to | ||||
|                            -> IO (Either String ())     -- Either signals success or failure | ||||
| clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do | ||||
|     -- collect tag interval | ||||
|     -- collect tag intearval | ||||
|     intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv) | ||||
|     -- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ] | ||||
|     -- extract subscribers and posts | ||||
|  |  | |||
		Loading…
	
	Add table
		Add a link
		
	
		Reference in a new issue