Skip to content

Commit c69f550

Browse files
committed
feat: enhance resource scheduling logs with clear status and configuration details
- Add detailed status information for workload group and warehouse limits - Display configuration parameter names (max_concurrency, max_running_queries) - Show real-time resource usage (X/Y slots used, queue length) - Improve user experience by clearly identifying bottlenecks and stages - Enable faster troubleshooting and optimization of resource limits
1 parent e2882f3 commit c69f550

File tree

1 file changed

+37
-11
lines changed

1 file changed

+37
-11
lines changed

src/query/service/src/sessions/queue_mgr.rs

Lines changed: 37 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -229,25 +229,35 @@ impl<Data: QueueData> QueueManager<Data> {
229229
workload_group_timeout = std::cmp::min(queue_timeout, workload_group_timeout);
230230
}
231231

232-
data.set_status("[QUERY-QUEUE] Waiting for local workload semaphore");
233-
234232
let semaphore = workload_group.semaphore.clone();
235-
let acquire = tokio::time::timeout(timeout, semaphore.acquire_owned());
233+
let used_slots = permits - semaphore.available_permits();
234+
data.set_status(&format!("[BLOCKED] Workload group '{}' local limit (max_concurrency={}): {}/{} slots used",
235+
workload_group.meta.name, permits, used_slots, permits));
236+
237+
let acquire = tokio::time::timeout(timeout, semaphore.clone().acquire_owned());
236238
let queue_future = AcquireQueueFuture::create(data.clone(), acquire, self.clone());
237239

238240
guards.push(queue_future.await?);
239241

242+
let used_slots_after = permits - semaphore.available_permits();
240243
info!(
241-
"[QUERY-QUEUE] Successfully acquired from local workload semaphore. elapsed: {:?}",
242-
instant.elapsed()
244+
"[ACQUIRED] Workload group '{}' local (max_concurrency={}): {}/{} slots used (waited {:.2}s)",
245+
workload_group.meta.name,
246+
permits,
247+
used_slots_after,
248+
permits,
249+
instant.elapsed().as_secs_f64()
243250
);
244251

245252
timeout -= instant.elapsed();
246253

247254
// Prevent concurrent access to meta and serialize the submission of acquire requests.
248255
// This ensures that at most permits + nodes acquirers will be in the queue at any given time.
249256
let _guard = workload_group.mutex.clone().lock_owned().await;
250-
data.set_status("[QUERY-QUEUE] Waiting for global workload semaphore");
257+
data.set_status(&format!(
258+
"[BLOCKED] Workload group '{}' global limit: acquiring distributed lock",
259+
workload_group.meta.name
260+
));
251261

252262
let workload_queue_guard = self
253263
.acquire_workload_queue(
@@ -259,15 +269,21 @@ impl<Data: QueueData> QueueManager<Data> {
259269
.await?;
260270

261271
info!(
262-
"[QUERY-QUEUE] Successfully acquired from global workload semaphore. elapsed: {:?}",
263-
instant.elapsed()
272+
"[ACQUIRED] Workload group '{}' global: distributed lock acquired (waited {:.2}s)",
273+
workload_group.meta.name,
274+
instant.elapsed().as_secs_f64()
264275
);
265276
timeout -= instant.elapsed();
266277
guards.push(workload_queue_guard);
267278
}
268279
}
269280

270-
data.set_status("[QUERY-QUEUE] Waiting for warehouse resource scheduling");
281+
let available_permits = self.semaphore.available_permits();
282+
let used_slots = self.permits - available_permits;
283+
let queue_length = self.length();
284+
285+
data.set_status(&format!("[BLOCKED] Warehouse limit (max_running_queries={}): {}/{} slots used, {} queries waiting",
286+
self.permits, used_slots, self.permits, queue_length));
271287

272288
guards.extend(self.acquire_warehouse_queue(data, timeout).await?);
273289

@@ -339,6 +355,8 @@ impl<Data: QueueData> QueueManager<Data> {
339355

340356
// Prevent concurrent access to meta and serialize the submission of acquire requests.
341357
// This ensures that at most permits + nodes acquirers will be in the queue at any given time.
358+
data.set_status(&format!("[BLOCKED] Global statement queue (max_running_queries={}): {}/{} slots used, acquiring cluster lock",
359+
self.permits, self.permits - self.semaphore.available_permits(), self.permits));
342360
let _guard = self.tokio_mutex.clone().lock_owned().await;
343361
let acquire = tokio::time::timeout(timeout, semaphore_acquire);
344362
let queue_future = AcquireQueueFuture::create(data, acquire, self.clone());
@@ -351,9 +369,17 @@ impl<Data: QueueData> QueueManager<Data> {
351369
match acquire_res {
352370
Ok(v) => {
353371
guards.push(v);
372+
let available_permits_after = self.semaphore.available_permits();
373+
let used_slots_after = self.permits - available_permits_after;
374+
let queue_length_after = self.length();
375+
376+
if self.global_statement_queue && guards.len() > 1 {
377+
info!("[ACQUIRED] Global statement queue: cluster lock acquired");
378+
}
379+
354380
info!(
355-
"[QUERY-QUEUE] Successfully acquired from queue, current length: {}",
356-
self.length()
381+
"[ACQUIRED] Warehouse (max_running_queries={}): {}/{} slots used, {} queries waiting",
382+
self.permits, used_slots_after, self.permits, queue_length_after
357383
);
358384

359385
Ok(guards)

0 commit comments

Comments
 (0)