Compare commits
No commits in common. "c1ce386b6599931b9e45268c18674457fd4f6ab0" and "96c1963a4f350f617820d96afae2d44d7ffdd749" have entirely different histories.
c1ce386b65
...
96c1963a4f
2 changed files with 4 additions and 59 deletions
|
@ -301,55 +301,6 @@ clientAPI = Proxy
|
||||||
relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postMultiFetchClient :<|> postInboxClient :<|> tagDeliveryClient :<|> tagSubscribeClient :<|> tagUnsubscribeClient = client clientAPI
|
relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postMultiFetchClient :<|> postInboxClient :<|> tagDeliveryClient :<|> tagSubscribeClient :<|> tagUnsubscribeClient = client clientAPI
|
||||||
|
|
||||||
|
|
||||||
-- | Deliver the subscriber list of all hashtags in the interval [fromTag, toTag]
|
|
||||||
-- and their outstanding delivery queue to another instance.
|
|
||||||
-- If the transfer succeeds, the transfered subscribers are removed from the local list.
|
|
||||||
clientDeliverSubscriptions :: PostService d
|
|
||||||
-> Hashtag -- ^ fromTag
|
|
||||||
-> Hashtag -- ^ toTag
|
|
||||||
-> (String, Int) -- ^ hostname and port of instance to deliver to
|
|
||||||
-> IO (Either String ()) -- Either signals success or failure
|
|
||||||
clientDeliverSubscriptions serv fromTag toTag (toHost, toPort) = do
|
|
||||||
-- collect tag intearval
|
|
||||||
intervalTags <- takeRMapSuccessorsFromTo (genKeyID $ Txt.unpack fromTag) (genKeyID $ Txt.unpack toTag) <$> readTVarIO (subscribers serv)
|
|
||||||
-- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ]
|
|
||||||
-- extract subscribers and posts
|
|
||||||
-- no need for extracting as a single atomic operation, as newly incoming posts are supposed to be rejected because of already having re-positioned on the DHT
|
|
||||||
subscriberData <- foldM (\response (subSTM, _, tag) -> do
|
|
||||||
subMap <- readTVarIO subSTM
|
|
||||||
thisTagsData <- foldM (\tagResponse (subscriber, (subChan, lease)) -> do
|
|
||||||
-- duplicate the pending queue to work on a copy, in case of a delivery error
|
|
||||||
pending <- atomically $ do
|
|
||||||
queueCopy <- cloneTChan subChan
|
|
||||||
channelGetAll queueCopy
|
|
||||||
if null pending
|
|
||||||
then pure tagResponse
|
|
||||||
else pure $ tag <> "," <> Txt.pack (show subscriber) <> "," <> Txt.pack (show lease) <> "," <> Txt.unwords pending <> "\n"
|
|
||||||
)
|
|
||||||
""
|
|
||||||
(HMap.toList subMap)
|
|
||||||
pure $ thisTagsData <> response
|
|
||||||
)
|
|
||||||
""
|
|
||||||
intervalTags
|
|
||||||
-- send subscribers
|
|
||||||
httpMan <- HTTP.newManager HTTP.defaultManagerSettings
|
|
||||||
resp <- runClientM (subscriptionDeliveryClient subscriberData) (mkClientEnv httpMan (BaseUrl Http toHost (fromIntegral toPort) ""))
|
|
||||||
-- on failure return a Left, otherwise delete subscription entry
|
|
||||||
case resp of
|
|
||||||
Left err -> pure . Left . show $ err
|
|
||||||
Right _ -> do
|
|
||||||
atomically $
|
|
||||||
modifyTVar' (subscribers serv) $ \tagMap ->
|
|
||||||
foldr deleteRMapEntry tagMap ((\(_, _, t) -> genKeyID . Txt.unpack $ t) <$> intervalTags)
|
|
||||||
pure . Right $ ()
|
|
||||||
where
|
|
||||||
channelGetAll :: TChan a -> STM [a]
|
|
||||||
channelGetAll chan = channelGetAll' chan []
|
|
||||||
channelGetAll' :: TChan a -> [a] -> STM [a]
|
|
||||||
channelGetAll' chan acc = do
|
|
||||||
haveRead <- tryReadTChan chan
|
|
||||||
maybe (pure acc) (\x -> channelGetAll' chan (x:acc)) haveRead
|
|
||||||
|
|
||||||
-- currently this is unused code
|
-- currently this is unused code
|
||||||
getClients :: String -> Int -> HTTP.Manager -> Client IO PostServiceAPI
|
getClients :: String -> Int -> HTTP.Manager -> Client IO PostServiceAPI
|
||||||
|
@ -456,7 +407,7 @@ instance {-# OVERLAPPABLE #-} Read a => MimeUnrender PlainText a where
|
||||||
|
|
||||||
-- ====== worker threads ======
|
-- ====== worker threads ======
|
||||||
|
|
||||||
-- | process the pending relay inbox of incoming posts from the internal queue:
|
-- | process the pending relays of incoming posts from the internal queue:
|
||||||
-- Look up responsible relay node for given hashtag and forward post to it
|
-- Look up responsible relay node for given hashtag and forward post to it
|
||||||
processIncomingPosts :: DHT d => PostService d -> IO ()
|
processIncomingPosts :: DHT d => PostService d -> IO ()
|
||||||
processIncomingPosts serv = forever $ do
|
processIncomingPosts serv = forever $ do
|
||||||
|
@ -471,11 +422,5 @@ processIncomingPosts serv = forever $ do
|
||||||
httpMan <- HTTP.newManager HTTP.defaultManagerSettings
|
httpMan <- HTTP.newManager HTTP.defaultManagerSettings
|
||||||
resp <- runClientM (relayInboxClient tag $ pID <> "," <> pContent) (mkClientEnv httpMan (BaseUrl Http responsibleHost (fromIntegral responsiblePort) ""))
|
resp <- runClientM (relayInboxClient tag $ pID <> "," <> pContent) (mkClientEnv httpMan (BaseUrl Http responsibleHost (fromIntegral responsiblePort) ""))
|
||||||
case resp of
|
case resp of
|
||||||
Left err -> do
|
Left err -> putStrLn $ "Error: " <> show err
|
||||||
putStrLn $ "Error: " <> show err
|
|
||||||
-- 410 error indicates outdated responsibility mapping
|
|
||||||
-- Simplification: just invalidate the mapping entry on all errors, force a re-lookup and re-queue the post
|
|
||||||
-- TODO: keep track of maximum retries
|
|
||||||
_ <- forceLookupKey (baseDHT serv) (Txt.unpack tag)
|
|
||||||
atomically . writeTQueue (relayInQueue serv) $ (tag, pID, pContent)
|
|
||||||
Right yay -> putStrLn $ "Yay! " <> show yay
|
Right yay -> putStrLn $ "Yay! " <> show yay
|
||||||
|
|
|
@ -241,9 +241,9 @@ takeRMapPredecessorsFromTo :: (Bounded k, Ord k, Num k)
|
||||||
-> [a]
|
-> [a]
|
||||||
takeRMapPredecessorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupPred toVal fromVal Nothing []
|
takeRMapPredecessorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupPred toVal fromVal Nothing []
|
||||||
|
|
||||||
takeRMapSuccessorsFromTo :: (Bounded k, Ord k, Num k)
|
takeRMapSuccesorsFromTo :: (Bounded k, Ord k, Num k)
|
||||||
=> k -- start value for taking
|
=> k -- start value for taking
|
||||||
-> k -- stop value for taking
|
-> k -- stop value for taking
|
||||||
-> RingMap k a
|
-> RingMap k a
|
||||||
-> [a]
|
-> [a]
|
||||||
takeRMapSuccessorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupSucc toVal fromVal Nothing []
|
takeRMapSuccesorsFromTo fromVal toVal rmap = takeEntriesUntil_ rmap rMapLookupSucc toVal fromVal Nothing []
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue