Skip to content

Commit c6bff0f

Browse files
committed
Plumb WebSocket observation functionality through the FFI
This will allow Android and iOS clients to observe the underlying state of the WebSocket connection to the Convex backend.
1 parent 2e3b727 commit c6bff0f

File tree

8 files changed

+91
-17
lines changed

8 files changed

+91
-17
lines changed

android/convexmobile/src/androidTest/java/dev/convex/android/ConvexClientInstrumentationTest.kt

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -212,6 +212,28 @@ class ConvexClientInstrumentationTest {
212212
assertEquals(expected, result)
213213
assertEquals(100, result.aPlainInt)
214214
}
215+
216+
@Test
217+
fun can_observe_websocket_state() = runTest {
218+
val client = ConvexClient(DEPLOYMENT_URL)
219+
220+
val states = mutableListOf<WebSocketState>()
221+
222+
val receiveJob = launch {
223+
client.webSocketStateFlow.take(2).collect { state ->
224+
states.add(state)
225+
}
226+
}
227+
228+
// It doesn't really matter which Convex function we call - but calling one should trigger
229+
// a connection.
230+
client.mutation<Unit?>("messages:clearAll")
231+
232+
receiveJob.join()
233+
234+
assertEquals(2, states.size)
235+
assertEquals(listOf(WebSocketState.CONNECTING, WebSocketState.CONNECTED), states)
236+
}
215237
}
216238

217239
@Serializable

android/convexmobile/src/androidTest/java/dev/convex/android/ConvexClientWithAuthTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ class ConvexClientWithAuthTest {
2626
client = ConvexClientWithAuth(
2727
"foo://bar",
2828
authProvider
29-
) { _, _ -> ffiClient }
29+
) { _, _, _ -> ffiClient }
3030
}
3131

3232
@Test

android/convexmobile/src/main/java/dev/convex/android/ConvexClient.kt

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ import kotlinx.coroutines.channels.awaitClose
77
import kotlinx.coroutines.flow.Flow
88
import kotlinx.coroutines.flow.MutableStateFlow
99
import kotlinx.coroutines.flow.StateFlow
10+
import kotlinx.coroutines.flow.asStateFlow
1011
import kotlinx.coroutines.flow.callbackFlow
1112
import kotlinx.serialization.Contextual
1213
import kotlinx.serialization.Serializable
@@ -46,12 +47,29 @@ typealias ConvexNum = Contextual
4647
*/
4748
open class ConvexClient(
4849
deploymentUrl: String,
49-
ffiClientFactory: (deploymentUrl: String, clientId: String) -> MobileConvexClientInterface = ::MobileConvexClient
50+
ffiClientFactory: (deploymentUrl: String, clientId: String, webSocketStateSubscriber: WebSocketStateSubscriber?) -> MobileConvexClientInterface = ::MobileConvexClient
5051
) {
52+
private val _webSocketStateFlow = MutableStateFlow(WebSocketState.CONNECTING)
53+
private val webSocketStateSubscriber = object : WebSocketStateSubscriber {
54+
override fun onStateChange(state: WebSocketState) {
55+
_webSocketStateFlow.value = state
56+
}
57+
}
5158

5259
@PublishedApi
5360
internal val ffiClient =
54-
ffiClientFactory(deploymentUrl, "kotlin-${BuildConfig.LIBRARY_VERSION}")
61+
ffiClientFactory(
62+
deploymentUrl,
63+
"kotlin-${BuildConfig.LIBRARY_VERSION}",
64+
webSocketStateSubscriber
65+
)
66+
67+
/**
68+
* A [Flow] of [WebSocketState].
69+
*
70+
* Will change states as the underlying state of the WebSocket connection to Convex changes.
71+
*/
72+
val webSocketStateFlow: StateFlow<WebSocketState> = _webSocketStateFlow.asStateFlow()
5573

5674
/**
5775
* Subscribes to the query with the given [name] and converts data from the subscription into a
@@ -195,7 +213,7 @@ open class ConvexClient(
195213
class ConvexClientWithAuth<T>(
196214
deploymentUrl: String,
197215
private val authProvider: AuthProvider<T>,
198-
ffiClientFactory: (deploymentUrl: String, clientId: String) -> MobileConvexClientInterface = ::MobileConvexClient
216+
ffiClientFactory: (deploymentUrl: String, clientId: String, webSocketSocketStateSubscriber: WebSocketStateSubscriber?) -> MobileConvexClientInterface = ::MobileConvexClient
199217
) : ConvexClient(deploymentUrl, ffiClientFactory) {
200218
private val _authState = MutableStateFlow<AuthState<T>>(AuthState.Unauthenticated())
201219

android/convexmobile/src/test/java/dev/convex/android/ConvexClientTest.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ class ConvexClientTest {
3939
@Before
4040
fun setup() {
4141
ffiClient = FakeFfiClient()
42-
client = ConvexClient("foo://bar") { _, _ -> ffiClient }
42+
client = ConvexClient("foo://bar") { _, _, _ -> ffiClient }
4343
}
4444

4545
@Test

rust/Cargo.lock

Lines changed: 1 addition & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
[package]
22
name = "convex-mobile"
3-
version = "0.2.2"
3+
version = "0.3.0"
44
edition = "2021"
55

66
[dependencies]

rust/src/convex-mobile.udl

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,18 @@ interface ClientError {
99
ServerError(string msg);
1010
};
1111

12+
enum WebSocketState {
13+
"Connected",
14+
"Connecting",
15+
};
16+
17+
[Trait, WithForeign]
18+
interface WebSocketStateSubscriber {
19+
void on_state_change(WebSocketState state);
20+
};
21+
1222
interface MobileConvexClient {
13-
constructor(string deployment_url, string client_id);
23+
constructor(string deployment_url, string client_id, WebSocketStateSubscriber? web_socket_state_subscriber);
1424

1525
[Async, Throws=ClientError]
1626
string query(string name, record<string, string> args);
@@ -37,4 +47,4 @@ interface SubscriptionHandle {
3747
interface QuerySubscriber {
3848
void on_update(string value);
3949
void on_error(string message, string? value);
40-
};
50+
};

rust/src/lib.rs

Lines changed: 32 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,13 @@ use std::{
44
};
55

66
use async_once_cell::OnceCell;
7-
use convex::{ConvexClient, ConvexClientBuilder, FunctionResult, Value};
7+
use convex::{ConvexClient, ConvexClientBuilder, FunctionResult, Value, WebSocketState};
88
use futures::{
99
channel::oneshot::{self, Sender},
1010
pin_mut, select_biased, FutureExt, StreamExt,
1111
};
1212
use parking_lot::Mutex;
13+
use tokio::sync::mpsc;
1314
use tracing::{debug, info};
1415

1516
mod logging;
@@ -43,6 +44,10 @@ pub trait QuerySubscriber: Send + Sync {
4344
fn on_error(&self, message: String, value: Option<String>) -> ();
4445
}
4546

47+
pub trait WebSocketStateSubscriber: Send + Sync {
48+
fn on_state_change(&self, state: WebSocketState) -> ();
49+
}
50+
4651
pub struct SubscriptionHandle {
4752
cancel_sender: Mutex<Option<Sender<()>>>,
4853
}
@@ -62,9 +67,9 @@ impl SubscriptionHandle {
6267
}
6368

6469
/// Initializes logging.
65-
///
70+
///
6671
/// Call this early in the life of your application to enable logging from
67-
/// [MobileConvexClient] and its dependencies.
72+
/// [MobileConvexClient] and its dependencies.
6873
pub fn init_convex_logging() {
6974
use std::sync::Once;
7075
static INIT: Once = Once::new();
@@ -84,6 +89,7 @@ pub fn init_convex_logging() {
8489
struct MobileConvexClient {
8590
deployment_url: String,
8691
client_id: String,
92+
web_socket_state_subscriber: Option<Arc<dyn WebSocketStateSubscriber>>,
8793
client: OnceCell<ConvexClient>,
8894
rt: tokio::runtime::Runtime,
8995
}
@@ -96,14 +102,19 @@ impl MobileConvexClient {
96102
///
97103
/// The `client_id` should be a string representing the name and version of
98104
/// the foreign client.
99-
pub fn new(deployment_url: String, client_id: String) -> MobileConvexClient {
105+
pub fn new(
106+
deployment_url: String,
107+
client_id: String,
108+
web_socket_state_subscriber: Option<Arc<dyn WebSocketStateSubscriber>>,
109+
) -> MobileConvexClient {
100110
let rt = tokio::runtime::Builder::new_multi_thread()
101111
.enable_all()
102112
.build()
103113
.unwrap();
104114
MobileConvexClient {
105115
deployment_url,
106116
client_id,
117+
web_socket_state_subscriber,
107118
client: OnceCell::new(),
108119
rt,
109120
}
@@ -122,12 +133,25 @@ impl MobileConvexClient {
122133
self.client
123134
.get_or_try_init(async {
124135
let client_id = self.client_id.to_owned();
136+
let (tx, mut rx) = mpsc::channel(1);
137+
let possible_subscriber = self.web_socket_state_subscriber.clone();
138+
if let Some(subscriber) = possible_subscriber.clone() {
139+
self.rt.spawn(async move {
140+
while let Some(state) = rx.recv().await {
141+
subscriber.on_state_change(state);
142+
}
143+
});
144+
}
145+
146+
let has_subscriber = possible_subscriber.is_some();
125147
self.rt
126148
.spawn(async move {
127-
ConvexClientBuilder::new(url.as_str())
128-
.with_client_id(&client_id)
129-
.build()
130-
.await
149+
let mut builder =
150+
ConvexClientBuilder::new(url.as_str()).with_client_id(&client_id);
151+
if has_subscriber {
152+
builder = builder.with_on_state_change(tx);
153+
}
154+
builder.build().await
131155
})
132156
.await?
133157
})

0 commit comments

Comments
 (0)