-
Notifications
You must be signed in to change notification settings - Fork 219
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Connected LivenessService to comms pub/sub #686
Conversation
@@ -0,0 +1,140 @@ | |||
// Copyright 2019 The Tari Project |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should submit this as a PR to the Tower repo, if it it's generally useful
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Was thinking that it could potentially be a FutureExt
combinator for tower services, will look into it
ba5efe2
to
968e122
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is some crazy stuff, when we get it polished it's going to be really slick.
So I have to agree with Cayle that the ServiceHandle, AndThenService and service composition stuff is extremely difficult to grok. I think it will be valuable to spend some time writing an extensive overview doc about how it all fits together because the few docstrings that there are do not actually describe how the full system goes together and how it resolves and, to be honest, I still don't 100% understand the full process of resolving the handles into a composed multi service thing.
Secondly, I think the Executor service stuff should move out of the P2P crate as I think it is going to be the framework for all domain services not just those used for the P2P stuff so lets talk about where it goes? This is essentially the framework we are going to use to compose our full applications.
|
||
/// Initializer for the Liveness service handle and service future. | ||
pub struct LivenessInitializer { | ||
inbound_subscriber: Arc<InboundTopicSubscriber<TariMessageType>>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This feels a little weird. Perhaps we should find a way for a service to require that on creation it is passed an already constructed TopicSubscription with the correct topic and then make it the problem of the thing that constructs the services?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ye this is service dependent - so you can do it in any way you like - however I'd like to keep service specific code in the initializer for that service rather than passing in LivenessInitializer::new(comms.inbound_subcriber().subscription(TariMessageType::new(NetMessage::PingPong))
You can do LivenessInitializer::new(comms.inbound_subscriber())
- might even just pass in comms and leave the linking of comms to the service completely up to the service initializer implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think pass in the Comms
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done - although the downside is that this requires a full comms stack to test. I've added a function to create the LivenessInitializer from a subscriber to keep tests easy. In the near future, we probably want to make it easy to setup a full (development) comms stack in integration tests
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making a mock Comms stack that just has the Subscriber and Future based interface should be pretty easy.
|
||
fn poll(&mut self) -> Poll<Self::Item, Self::Error> { | ||
let msg = self.message.take().expect("poll called twice on Deserializer"); | ||
blocking(|| { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems overkill to need blocking here? It's just a memory mapping? This is definitely a discussion to have to decide where the threshold is for an operation to be put into Blocking but a straight up memory based read seems like its plenty fast enough to be done directly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yup - maybe - not sure either - it's a memory mapping because we happen to be using bincode - however that could change and is up to the MessageFormat impl. Tasks are super cheap to create according to tokio. Definitely up for debate and I'm sure we'll come across many of these cases.
|
||
impl<T: MessageFormat> Future for DomainMessageDeserializer<T> { | ||
type Error = BlockingError; | ||
type Item = Result<(MessageInfo, T), MessageError>; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we are going as far as to make a future for this then I think its time to have a proper struct to hold the fields of the (MessageInfo, T) tuple.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No strong opinions on that - will do in the next iteration :)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is more apparent that this tuple is janky when you look at the code that uses it. When you have matches and code that are using result.0 and result.1 to access the two fields it doesn't look good.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've started with using a DomainMessage<T>
struct in another branch which I'll PR soon.
So I just read what you wrote in the PR and I think that goes a long way to clarifying how the Service framework should be used. The academic in me also thinks that the whole structure could be neatly conveyed/summarized with a simple diagram. |
074a9d4
to
9cbd1a9
Compare
Could you squash all your commits together please? |
aa6fc28
to
4cf5d76
Compare
- LivenessHandler connected up to a stream of PingPong messages - Refactored the way services are initialized, keeping it super simple and to allow more flexibility.
4cf5d76
to
f7408fc
Compare
Merge pull request #686 Description LivenessHandler connected up to a stream of PingPong messages Refactored the way services are initialized, keeping it super simple and to allow more flexibility Tari Services are one or more (usually) long-running asynchronous tasks which furnish request via a provided interface (called a Handle). Dropping the handle must cancel the task. This is (usually) implemented using an mpsc channel and/or subscription stream. That is, once all senders have disconnected, the receiver stream will close and so will the task. Services usually subscribe to one or many comms messages and update state or perform operations as necessary. Services are set up by implementing the ServiceInitializer trait - this trait can be passed to the StackBuilder which is responsible for initializing the services and signalling when all services are initialized. Services can access other service handles allowing them to make requests on other services. This is done by retrieving the appropriate handle from the ServiceHandles collection which is provided to every ServiceInitializer. Handles can easily (but not necessarily) be implemented by using the tari_p2p::executor::transport::channel(S) function. This function takes in a service S and returns a Requester/Responder pair - the requester can be used to make asynchronous requests to the given service S. A handle, however, can be implemented in whatever way the service implementer sees fit. Motivation and Context Closes #676 Ref #621 How Has This Been Tested? Unit tests (integration tests to follow)
Description
and to allow more flexibility
Tari Services are one or more (usually) long-running asynchronous tasks which furnish request via a provided interface (called a Handle). Dropping the handle must cancel the task. This is (usually) implemented using an mpsc channel and/or subscription stream. That is, once all senders have disconnected, the receiver stream will close and so will the task. Services usually subscribe to one or many comms messages and update state or perform operations as necessary.
Services are set up by implementing the ServiceInitializer trait - this trait can be passed to the StackBuilder which is responsible for initializing the services and signalling when all services are initialized.
Services can access other service handles allowing them to make requests on other services. This is done by retrieving the appropriate handle from the ServiceHandles collection which is provided to every ServiceInitializer.
Handles can easily (but not necessarily) be implemented by using the
tari_p2p::executor::transport::channel(S)
function. This function takes in a serviceS
and returns a Requester/Responder pair - the requester can be used to make asynchronous requests to the given serviceS
. A handle, however, can be implemented in whatever way the service implementer sees fit.Motivation and Context
Closes #676
Ref #621
How Has This Been Tested?
Unit tests (integration tests to follow)
Types of changes
Checklist:
development
branchcargo-fmt --all
before pushing