Compare commits

..

4 commits

4 changed files with 89 additions and 41 deletions

View file

@ -41,13 +41,14 @@ module Hash2Pub.DHTProtocol
) )
where where
import Control.Concurrent
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Concurrent.STM.TBQueue import Control.Concurrent.STM.TBQueue
import Control.Concurrent.STM.TQueue import Control.Concurrent.STM.TQueue
import Control.Concurrent.STM.TVar import Control.Concurrent.STM.TVar
import Control.Exception import Control.Exception
import Control.Monad (foldM, forM, forM_) import Control.Monad (foldM, forM, forM_, when)
import qualified Data.ByteString as BS import qualified Data.ByteString as BS
import Data.Either (rights) import Data.Either (rights)
import Data.Foldable (foldl', foldr') import Data.Foldable (foldl', foldr')
@ -75,10 +76,11 @@ import Hash2Pub.FediChordTypes (CacheEntry (..),
LocalNodeState (..), LocalNodeState (..),
LocalNodeStateSTM, NodeCache, LocalNodeStateSTM, NodeCache,
NodeID, NodeState (..), NodeID, NodeState (..),
RealNode (..), RealNode (..), RealNodeSTM,
RemoteNodeState (..), RemoteNodeState (..),
RingEntry (..), RingMap (..), RingEntry (..), RingMap (..),
addRMapEntry, addRMapEntryWith, Service (..), addRMapEntry,
addRMapEntryWith,
cacheGetNodeStateUnvalidated, cacheGetNodeStateUnvalidated,
cacheLookup, cacheLookupPred, cacheLookup, cacheLookupPred,
cacheLookupSucc, genNodeID, cacheLookupSucc, genNodeID,
@ -250,7 +252,8 @@ ackRequest _ _ = Map.empty
-- | Dispatch incoming requests to the dedicated handling and response function, and enqueue -- | Dispatch incoming requests to the dedicated handling and response function, and enqueue
-- the response to be sent. -- the response to be sent.
handleIncomingRequest :: LocalNodeStateSTM s -- ^ the handling node handleIncomingRequest :: Service s (RealNodeSTM s)
=> LocalNodeStateSTM s -- ^ the handling node
-> TQueue (BS.ByteString, SockAddr) -- ^ send queue -> TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> Set.Set FediChordMessage -- ^ all parts of the request to handle -> Set.Set FediChordMessage -- ^ all parts of the request to handle
-> SockAddr -- ^ source address of the request -> SockAddr -- ^ source address of the request
@ -422,10 +425,10 @@ respondPing nsSTM msgSet = do
-- this modifies node state, so locking and IO seems to be necessary. -- this modifies node state, so locking and IO seems to be necessary.
-- Still try to keep as much code as possible pure -- Still try to keep as much code as possible pure
respondJoin :: LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString) respondJoin :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> Set.Set FediChordMessage -> IO (Map.Map Integer BS.ByteString)
respondJoin nsSTM msgSet = do respondJoin nsSTM msgSet = do
-- atomically read and modify the node state according to the parsed request -- atomically read and modify the node state according to the parsed request
responseMsg <- atomically $ do (dataMigration, responseMsg) <- atomically $ do
nsSnap <- readTVar nsSTM nsSnap <- readTVar nsSTM
cache <- readTVar $ nodeCacheSTM nsSnap cache <- readTVar $ nodeCacheSTM nsSnap
let let
@ -455,24 +458,31 @@ respondJoin nsSTM msgSet = do
, payload = Just responsePayload , payload = Just responsePayload
} }
writeTVar nsSTM joinedNS writeTVar nsSTM joinedNS
pure joinResponse ownService <- nodeService <$> readTVar (parentRealNode nsSnap)
let
serviceDataMigrator = migrateData ownService (getNid nsSnap) lowerKeyBound (getNid senderNS) (getDomain senderNS, fromIntegral $ getServicePort senderNS)
lowerKeyBound = maybe (getNid nsSnap) getNid $ headMay (predecessors nsSnap)
pure (Just serviceDataMigrator, joinResponse)
-- otherwise respond with empty payload -- otherwise respond with empty payload
else pure Response { else pure (Nothing, Response {
requestID = requestID aRequestPart requestID = requestID aRequestPart
, senderID = getNid nsSnap , senderID = getNid nsSnap
, part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1 , part = if Set.size msgSet == 1 then 1 else fromIntegral $ Set.size msgSet + 1
, isFinalPart = False , isFinalPart = False
, action = Join , action = Join
, payload = Nothing , payload = Nothing
} })
-- as DHT response is required immediately, fork the service data migration push
-- into a new thread. That's kind of ugly but the best I can think of so far
when (isJust dataMigration) (forkIO (fromJust dataMigration >> pure ()) >> pure ())
pure $ serialiseMessage sendMessageSize responseMsg pure $ serialiseMessage sendMessageSize responseMsg
-- TODO: notify service layer to copy over data now handled by the new joined node -- TODO: notify service layer to copy over data now handled by the new joined node
-- ....... request sending ....... -- ....... request sending .......
-- | send a join request and return the joined 'LocalNodeState' including neighbours -- | send a join request and return the joined 'LocalNodeState' including neighbours
requestJoin :: NodeState a => a -- ^ currently responsible node to be contacted requestJoin :: (NodeState a, Service s (RealNodeSTM s)) => a -- ^ currently responsible node to be contacted
-> LocalNodeStateSTM s -- ^ joining NodeState -> LocalNodeStateSTM s -- ^ joining NodeState
-> IO (Either String (LocalNodeStateSTM s)) -- ^ node after join with all its new information -> IO (Either String (LocalNodeStateSTM s)) -- ^ node after join with all its new information
requestJoin toJoinOn ownStateSTM = do requestJoin toJoinOn ownStateSTM = do
@ -511,12 +521,15 @@ requestJoin toJoinOn ownStateSTM = do
pure (cacheInsertQ, newState) pure (cacheInsertQ, newState)
-- execute the cache insertions -- execute the cache insertions
mapM_ (\f -> f joinedState) cacheInsertQ mapM_ (\f -> f joinedState) cacheInsertQ
pure $ if responses == Set.empty if responses == Set.empty
then Left $ "join error: got no response from " <> show (getNid toJoinOn) then pure . Left $ "join error: got no response from " <> show (getNid toJoinOn)
else if null (predecessors joinedState) && null (successors joinedState) else if null (predecessors joinedState) && null (successors joinedState)
then Left "join error: no predecessors or successors" then pure $ Left "join error: no predecessors or successors"
-- successful join -- successful join
else Right ownStateSTM else do
-- wait for migration data to be completely received
waitForMigrationFrom (nodeService prn) (getNid ownState)
pure $ Right ownStateSTM
) )
`catch` (\e -> pure . Left $ displayException (e :: IOException)) `catch` (\e -> pure . Left $ displayException (e :: IOException))

View file

@ -152,9 +152,10 @@ nodeStateInit realNodeSTM = do
-- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed -- | Join a new node into the DHT, using a provided bootstrap node as initial cache seed
-- for resolving the new node's position. -- for resolving the new node's position.
fediChordBootstrapJoin :: LocalNodeStateSTM s -- ^ the local 'NodeState' fediChordBootstrapJoin :: Service s (RealNodeSTM s)
-> (String, PortNumber) -- ^ domain and port of a bootstrapping node => LocalNodeStateSTM s -- ^ the local 'NodeState'
-> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a -> (String, PortNumber) -- ^ domain and port of a bootstrapping node
-> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a
-- successful join, otherwise an error message -- successful join, otherwise an error message
fediChordBootstrapJoin nsSTM bootstrapNode = do fediChordBootstrapJoin nsSTM bootstrapNode = do
-- can be invoked multiple times with all known bootstrapping nodes until successfully joined -- can be invoked multiple times with all known bootstrapping nodes until successfully joined
@ -170,7 +171,7 @@ fediChordBootstrapJoin nsSTM bootstrapNode = do
-- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters. -- Periodically lookup own ID through a random bootstrapping node to discover and merge separated DHT clusters.
-- Unjoined try joining instead. -- Unjoined try joining instead.
convergenceSampleThread :: LocalNodeStateSTM s -> IO () convergenceSampleThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
convergenceSampleThread nsSTM = forever $ do convergenceSampleThread nsSTM = forever $ do
nsSnap <- readTVarIO nsSTM nsSnap <- readTVarIO nsSTM
parentNode <- readTVarIO $ parentRealNode nsSnap parentNode <- readTVarIO $ parentRealNode nsSnap
@ -201,7 +202,7 @@ convergenceSampleThread nsSTM = forever $ do
-- | Try joining the DHT through any of the bootstrapping nodes until it succeeds. -- | Try joining the DHT through any of the bootstrapping nodes until it succeeds.
tryBootstrapJoining :: LocalNodeStateSTM s -> IO (Either String (LocalNodeStateSTM s)) tryBootstrapJoining :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO (Either String (LocalNodeStateSTM s))
tryBootstrapJoining nsSTM = do tryBootstrapJoining nsSTM = do
bss <- atomically $ do bss <- atomically $ do
nsSnap <- readTVar nsSTM nsSnap <- readTVar nsSTM
@ -249,8 +250,9 @@ bootstrapQueryId nsSTM (bootstrapHost, bootstrapPort) targetID = do
-- | join a node to the DHT using the global node cache -- | join a node to the DHT using the global node cache
-- node's position. -- node's position.
fediChordVserverJoin :: LocalNodeStateSTM s -- ^ the local 'NodeState' fediChordVserverJoin :: Service s (RealNodeSTM s)
-> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a => LocalNodeStateSTM s -- ^ the local 'NodeState'
-> IO (Either String (LocalNodeStateSTM s)) -- ^ the joined 'NodeState' after a
-- successful join, otherwise an error message -- successful join, otherwise an error message
fediChordVserverJoin nsSTM = do fediChordVserverJoin nsSTM = do
ns <- readTVarIO nsSTM ns <- readTVarIO nsSTM
@ -281,10 +283,10 @@ fediChordVserverLeave ns = do
(\neighb -> do (\neighb -> do
leaveResponse <- requestLeave ns neighb leaveResponse <- requestLeave ns neighb
case leaveResponse of case leaveResponse of
Left _ -> sendUntilSuccess (i+1) Left _ -> sendUntilSuccess (i+1)
-- return first successfully contacted neighbour, -- return first successfully contacted neighbour,
-- so it can be contacted by the service layer for migration -- so it can be contacted by the service layer for migration
Right _ -> pure $ Right neighb Right _ -> pure $ Right neighb
) )
$ atMay (successors ns) i $ atMay (successors ns) i
migrateSuccessor :: (MonadError String m, MonadIO m) => m () migrateSuccessor :: (MonadError String m, MonadIO m) => m ()
@ -293,15 +295,18 @@ fediChordVserverLeave ns = do
successorLeave <- liftIO $ sendUntilSuccess 0 successorLeave <- liftIO $ sendUntilSuccess 0
-- trigger service data transfer for abandoned key space -- trigger service data transfer for abandoned key space
migrateToNode <- liftEither successorLeave migrateToNode <- liftEither successorLeave
let lowerKeyBound = maybe (getNid ns) getNid $ headMay (predecessors ns)
ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode ns) ownService <- nodeService <$> (liftIO . readTVarIO $ parentRealNode ns)
migrationResult <- liftIO $ migrateData ownService (getNid ns) (getNid migrateToNode) (getDomain migrateToNode, fromIntegral $ getServicePort migrateToNode) -- previously held data is the one between the immediate predecessor and
-- the own ID
migrationResult <- liftIO $ migrateData ownService (getNid ns) lowerKeyBound (getNid ns) (getDomain migrateToNode, fromIntegral $ getServicePort migrateToNode)
liftEither migrationResult liftEither migrationResult
-- | Wait for new cache entries to appear and then try joining on them. -- | Wait for new cache entries to appear and then try joining on them.
-- Exits after successful joining. -- Exits after successful joining.
joinOnNewEntriesThread :: LocalNodeStateSTM s -> IO () joinOnNewEntriesThread :: Service s (RealNodeSTM s) => LocalNodeStateSTM s -> IO ()
joinOnNewEntriesThread nsSTM = loop joinOnNewEntriesThread nsSTM = loop
where where
loop = do loop = do
@ -576,7 +581,7 @@ sendThread sock sendQ = forever $ do
sendAllTo sock packet addr sendAllTo sock packet addr
-- | Sets up and manages the main server threads of FediChord -- | Sets up and manages the main server threads of FediChord
fediMainThreads :: Socket -> LocalNodeStateSTM s -> IO () fediMainThreads :: Service s (RealNodeSTM s) => Socket -> LocalNodeStateSTM s -> IO ()
fediMainThreads sock nsSTM = do fediMainThreads sock nsSTM = do
ns <- readTVarIO nsSTM ns <- readTVarIO nsSTM
putStrLn "launching threads" putStrLn "launching threads"
@ -619,7 +624,8 @@ requestMapPurge mapVar = forever $ do
-- | Wait for messages, deserialise them, manage parts and acknowledgement status, -- | Wait for messages, deserialise them, manage parts and acknowledgement status,
-- and pass them to their specific handling function. -- and pass them to their specific handling function.
fediMessageHandler :: TQueue (BS.ByteString, SockAddr) -- ^ send queue fediMessageHandler :: Service s (RealNodeSTM s)
=> TQueue (BS.ByteString, SockAddr) -- ^ send queue
-> TQueue (BS.ByteString, SockAddr) -- ^ receive queue -> TQueue (BS.ByteString, SockAddr) -- ^ receive queue
-> LocalNodeStateSTM s -- ^ acting NodeState -> LocalNodeStateSTM s -- ^ acting NodeState
-> IO () -> IO ()

View file

@ -426,10 +426,13 @@ class Service s d where
getListeningPortFromService :: (Integral i) => s d -> i getListeningPortFromService :: (Integral i) => s d -> i
-- | trigger a service data migration of data between the two given keys -- | trigger a service data migration of data between the two given keys
migrateData :: s d migrateData :: s d
-> NodeID -- ^ source/ sender node ID
-> NodeID -- ^ start key -> NodeID -- ^ start key
-> NodeID -- ^ end key -> NodeID -- ^ end key
-> (String, Int) -- ^ hostname and port of target service -> (String, Int) -- ^ hostname and port of target service
-> IO (Either String ()) -- ^ success or failure -> IO (Either String ()) -- ^ success or failure
-- | Wait for an incoming migration from a given node to succeed, may block forever
waitForMigrationFrom :: s d -> NodeID -> IO ()
instance Hashable.Hashable NodeID where instance Hashable.Hashable NodeID where
hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID hashWithSalt salt = Hashable.hashWithSalt salt . getNodeID

View file

@ -10,6 +10,7 @@ module Hash2Pub.PostService where
import Control.Concurrent import Control.Concurrent
import Control.Concurrent.Async import Control.Concurrent.Async
import Control.Concurrent.MVar
import Control.Concurrent.STM import Control.Concurrent.STM
import Control.Concurrent.STM.TChan import Control.Concurrent.STM.TChan
import Control.Concurrent.STM.TChan import Control.Concurrent.STM.TChan
@ -46,20 +47,21 @@ import Hash2Pub.RingMap
data PostService d = PostService data PostService d = PostService
{ serviceConf :: ServiceConf { serviceConf :: ServiceConf
-- queues, other data structures -- queues, other data structures
, baseDHT :: (DHT d) => d , baseDHT :: (DHT d) => d
, serviceThread :: TVar ThreadId , serviceThread :: TVar ThreadId
, subscribers :: TVar RelayTags , subscribers :: TVar RelayTags
-- ^ for each tag store the subscribers + their queue -- ^ for each tag store the subscribers + their queue
, ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime) , ownSubscriptions :: TVar (HMap.HashMap NodeID POSIXTime)
-- ^ tags subscribed by the own node have an assigned lease time -- ^ tags subscribed by the own node have an assigned lease time
, ownPosts :: TVar (HSet.HashSet Txt.Text) , ownPosts :: TVar (HSet.HashSet Txt.Text)
-- ^ just store the existence of posts for saving memory, -- ^ just store the existence of posts for saving memory,
, relayInQueue :: TQueue (Hashtag, PostID, PostContent) , relayInQueue :: TQueue (Hashtag, PostID, PostContent)
-- ^ Queue for processing incoming posts of own instance asynchronously -- ^ Queue for processing incoming posts of own instance asynchronously
, postFetchQueue :: TQueue PostID , postFetchQueue :: TQueue PostID
, httpMan :: HTTP.Manager , migrationsInProgress :: TVar (HMap.HashMap NodeID (MVar ()))
, httpMan :: HTTP.Manager
} }
deriving (Typeable) deriving (Typeable)
@ -86,6 +88,7 @@ instance DHT d => Service PostService d where
ownPostVar <- newTVarIO HSet.empty ownPostVar <- newTVarIO HSet.empty
relayInQueue' <- newTQueueIO relayInQueue' <- newTQueueIO
postFetchQueue' <- newTQueueIO postFetchQueue' <- newTQueueIO
migrationsInProgress' <- newTVarIO HMap.empty
httpMan' <- HTTP.newManager HTTP.defaultManagerSettings httpMan' <- HTTP.newManager HTTP.defaultManagerSettings
let let
thisService = PostService { thisService = PostService {
@ -97,6 +100,7 @@ instance DHT d => Service PostService d where
, ownPosts = ownPostVar , ownPosts = ownPostVar
, relayInQueue = relayInQueue' , relayInQueue = relayInQueue'
, postFetchQueue = postFetchQueue' , postFetchQueue = postFetchQueue'
, migrationsInProgress = migrationsInProgress'
, httpMan = httpMan' , httpMan = httpMan'
} }
port' = fromIntegral (confServicePort conf) port' = fromIntegral (confServicePort conf)
@ -117,6 +121,17 @@ instance DHT d => Service PostService d where
migrateData = clientDeliverSubscriptions migrateData = clientDeliverSubscriptions
waitForMigrationFrom serv fromID = do
migrationSynchroniser <- atomically $ do
syncPoint <- HMap.lookup fromID <$> readTVar (migrationsInProgress serv)
maybe
-- decision: this function blocks until it gets an incoming migration from given ID
retry
pure
syncPoint
-- block until migration finished
takeMVar migrationSynchroniser
-- | return a WAI application -- | return a WAI application
postServiceApplication :: DHT d => PostService d -> Application postServiceApplication :: DHT d => PostService d -> Application
@ -136,7 +151,7 @@ placeholderPost = Txt.take 5120 . Txt.repeat $ 'O' -- size 5KiB
type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent type PostServiceAPI = "relay" :> "inbox" :> Capture "hashtag" Txt.Text :> ReqBody '[PlainText] Txt.Text :> PutCreated '[PlainText] NoContent
-- delivery endpoint of newly published posts of the relay's instance -- delivery endpoint of newly published posts of the relay's instance
:<|> "relay" :> "subscribers" :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text :<|> "relay" :> "subscribers" :> Capture "senderID" Integer :> ReqBody '[PlainText] Txt.Text :> PostNoContent '[PlainText] Txt.Text
-- endpoint for delivering the subscriptions and outstanding queue -- endpoint for delivering the subscriptions and outstanding queue
:<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text :<|> "post" :> Capture "postid" Txt.Text :> Get '[PlainText] Txt.Text
-- fetch endpoint for posts, full post ID is http://$domain/post/$postid -- fetch endpoint for posts, full post ID is http://$domain/post/$postid
@ -194,10 +209,14 @@ newtype UnhandledTagException = UnhandledTagException String
instance Exception UnhandledTagException instance Exception UnhandledTagException
subscriptionDelivery :: DHT d => PostService d -> Txt.Text -> Handler Txt.Text subscriptionDelivery :: DHT d => PostService d -> Integer -> Txt.Text -> Handler Txt.Text
subscriptionDelivery serv subList = do subscriptionDelivery serv senderID subList = do
let let
tagSubs = Txt.lines subList tagSubs = Txt.lines subList
-- signal that the migration is in progress
syncMVar <- liftIO newEmptyMVar
liftIO . atomically $ modifyTVar' (migrationsInProgress serv) $
HMap.insert (fromInteger senderID) syncMVar
-- In favor of having the convenience of rolling back the transaction once a -- In favor of having the convenience of rolling back the transaction once a
-- not-handled tag occurs, this results in a single large transaction. -- not-handled tag occurs, this results in a single large transaction.
-- Hopefully the performance isn't too bad. -- Hopefully the performance isn't too bad.
@ -211,6 +230,12 @@ subscriptionDelivery serv subList = do
`catchSTM` (\e -> pure . Left $ show (e :: UnhandledTagException)) `catchSTM` (\e -> pure . Left $ show (e :: UnhandledTagException))
-- TODO: potentially log this -- TODO: potentially log this
:: STM (Either String ())) :: STM (Either String ()))
-- TODO: should this always signal migration finished to avoid deadlocksP
liftIO $ putMVar syncMVar () -- wakes up waiting thread
liftIO $ putMVar syncMVar () -- blocks until waiting thread has resumed
-- delete this migration from ongoing ones
liftIO . atomically $ modifyTVar' (migrationsInProgress serv) $
HMap.delete (fromInteger senderID)
case res of case res of
Left err -> throwError err410 {errBody = BSUL.fromString err} Left err -> throwError err410 {errBody = BSUL.fromString err}
Right _ -> pure "" Right _ -> pure ""
@ -322,11 +347,12 @@ relayInboxClient :<|> subscriptionDeliveryClient :<|> postFetchClient :<|> postM
-- and their outstanding delivery queue to another instance. -- and their outstanding delivery queue to another instance.
-- If the transfer succeeds, the transfered subscribers are removed from the local list. -- If the transfer succeeds, the transfered subscribers are removed from the local list.
clientDeliverSubscriptions :: PostService d clientDeliverSubscriptions :: PostService d
-> NodeID -- ^ sender node ID
-> NodeID -- ^ fromTag -> NodeID -- ^ fromTag
-> NodeID -- ^ toTag -> NodeID -- ^ toTag
-> (String, Int) -- ^ hostname and port of instance to deliver to -> (String, Int) -- ^ hostname and port of instance to deliver to
-> IO (Either String ()) -- Either signals success or failure -> IO (Either String ()) -- Either signals success or failure
clientDeliverSubscriptions serv fromKey toKey (toHost, toPort) = do clientDeliverSubscriptions serv fromNode fromKey toKey (toHost, toPort) = do
-- collect tag intearval -- collect tag intearval
intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv) intervalTags <- takeRMapSuccessorsFromTo fromKey toKey <$> readTVarIO (subscribers serv)
-- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ] -- returns a [ (TagSubscribersSTM, TChan PostID, Hashtag) ]
@ -350,7 +376,7 @@ clientDeliverSubscriptions serv fromKey toKey (toHost, toPort) = do
"" ""
intervalTags intervalTags
-- send subscribers -- send subscribers
resp <- runClientM (subscriptionDeliveryClient subscriberData) (mkClientEnv (httpMan serv) (BaseUrl Http toHost (fromIntegral toPort) "")) resp <- runClientM (subscriptionDeliveryClient (getNodeID fromNode) subscriberData) (mkClientEnv (httpMan serv) (BaseUrl Http toHost (fromIntegral toPort) ""))
-- on failure return a Left, otherwise delete subscription entry -- on failure return a Left, otherwise delete subscription entry
case resp of case resp of
Left err -> pure . Left . show $ err Left err -> pure . Left . show $ err