Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/query/service/src/servers/http/middleware/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ use crate::servers::http::error::HttpErrorCode;
use crate::servers::http::error::JsonErrorOnly;
use crate::servers::http::error::QueryError;
use crate::servers::http::middleware::session_header::ClientSession;
use crate::servers::http::middleware::session_header::ClientSessionType;
use crate::servers::http::middleware::ClientCapabilities;
use crate::servers::http::v1::HttpQueryContext;
use crate::servers::http::v1::SessionClaim;
Expand Down Expand Up @@ -453,8 +454,10 @@ impl<E> HTTPSessionEndpoint<E> {
// log every request, which can be distinguished by `session_id = ''`
login_history.disable_write = true;
}
s.try_refresh_state(session.get_current_tenant(), &user_name, req.cookie())
.await?;
if ClientSessionType::IDOnly != s.typ {
s.try_refresh_state(session.get_current_tenant(), &user_name, req.cookie())
.await?;
}
}

let session = session_manager.register_session(session)?;
Expand Down
15 changes: 4 additions & 11 deletions src/query/service/src/servers/http/middleware/session_header.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ impl ClientSession {
id,
last_refresh_time: SystemTime::now(),
},
typ: ClientSessionType::Cookie,
typ: ClientSessionType::IDOnly,
is_new_session: false,
refreshed: false,
});
Expand Down Expand Up @@ -215,16 +215,9 @@ impl ClientSession {
if ClientSessionType::IDOnly == self.typ
|| elapsed > client_session_mgr.min_refresh_interval
{
if client_session_mgr.refresh_in_memory_states(&self.header.id, user_name) {
client_session_mgr
.refresh_session_handle(tenant, user_name.to_string(), &self.header.id)
.await?;
info!(
"[HTTP-SESSION] refreshing session {} after {} seconds",
self.header.id,
elapsed.as_secs(),
);
}
client_session_mgr
.try_refresh_state(tenant, &self.header.id, user_name)
.await?;
self.refreshed = true;
if ClientSessionType::Cookie == self.typ {
cookie.add(make_cookie(
Expand Down
3 changes: 2 additions & 1 deletion src/query/service/src/servers/http/v1/http_query_handlers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -398,14 +398,15 @@ async fn query_page_handler(
Path((query_id, page_no)): Path<(String, usize)>,
) -> PoemResult<impl IntoResponse> {
ctx.check_node_id(&query_id)?;
// tracing in middleware

let http_query_manager = HttpQueryManager::instance();

let Some(query) = http_query_manager.get_query(&query_id) else {
return Err(query_id_not_found(&query_id, &ctx.node_id));
};

ctx.try_refresh_worksheet_session().await.ok();

let query_mem_stat = query.query_mem_stat.clone();

let query_page_handle = {
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/servers/http/v1/query/http_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -587,6 +587,7 @@ impl HttpQuery {
http_ctx: &HttpQueryContext,
req: HttpQueryRequest,
) -> Result<HttpQuery> {
http_ctx.try_set_worksheet_session(&req.session).await?;
let (session, ctx) = http_ctx
.create_session(&req.session, SessionType::HTTPQuery)
.await?;
Expand Down
50 changes: 50 additions & 0 deletions src/query/service/src/servers/http/v1/query/http_query_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use poem::RequestBody;

use crate::auth::Credential;
use crate::servers::http::middleware::session_header::ClientSession;
use crate::servers::http::middleware::session_header::ClientSessionType;
use crate::servers::http::middleware::ClientCapabilities;
use crate::servers::http::v1::ClientSessionManager;
use crate::servers::http::v1::HttpQueryManager;
Expand Down Expand Up @@ -112,7 +113,56 @@ impl HttpQueryContext {
return Err(poem::Error::from_string(msg, StatusCode::NOT_FOUND));
}
}
Ok(())
}

pub async fn try_refresh_worksheet_session(&self) -> databend_common_exception::Result<()> {
if let Some(client_session) = self.client_session.as_ref() {
if client_session.typ == ClientSessionType::IDOnly {
ClientSessionManager::instance()
.try_refresh_state(
self.session.get_current_tenant(),
&client_session.header.id,
&self.user_name,
)
.await?;
}
}
Ok(())
}

pub async fn try_set_worksheet_session(
&self,
http_session_conf: &Option<HttpSessionConf>,
) -> databend_common_exception::Result<()> {
if let Some(client_session) = self.client_session.as_ref() {
if client_session.typ == ClientSessionType::IDOnly {
if let Some(conf) = http_session_conf.as_ref() {
match &conf.internal {
Some(internal) => {
if internal.has_temp_table {
ClientSessionManager::instance()
.try_refresh_state(
self.session.get_current_tenant(),
&client_session.header.id,
&self.user_name,
)
.await?;
}
}
None => {
ClientSessionManager::instance()
.drop_client_session_state(
&self.session.get_current_tenant(),
&self.user_name,
&client_session.header.id,
)
.await?;
}
}
}
}
}
Ok(())
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,19 @@ impl ClientSessionManager {
}
}
}
pub async fn try_refresh_state(
&self,
tenant: Tenant,
sid: &str,
user_name: &str,
) -> Result<()> {
if self.refresh_in_memory_states(sid, user_name) {
self.refresh_session_handle(tenant, user_name.to_string(), sid)
.await?;
info!("[HTTP-SESSION] refreshing session {}", sid);
}
Ok(())
}

pub async fn refresh_session_handle(
&self,
Expand Down
10 changes: 8 additions & 2 deletions tests/suites/1_stateful/09_http_handler/09_0007_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,8 +167,9 @@ def test_no_session():
HEADER_SESSION_ID_V = "101010"


def do_query_from_worksheet(client, sql, sid=HEADER_SESSION_ID_V):
payload = {"sql": sql, "pagination": {"max_rows_per_page": 2, "wait_time_secs": 10}}
def do_query_from_worksheet(client, sql, sid=HEADER_SESSION_ID_V, new_session=False):
internal = None if new_session else "{}";
payload = {"sql": sql, "pagination": {"max_rows_per_page": 2, "wait_time_secs": 10}, "session": {"internal": internal}}
resp = client.post(
query_url,
auth=auth,
Expand Down Expand Up @@ -204,6 +205,11 @@ def test_worksheet_session():
resp = do_query_from_worksheet(client, "select * from t09_0007")
assert resp["data"] == [["1"]], resp

resp = do_query_from_worksheet(client, "select * from t09_0007", new_session=True)
assert "Unknown table" in resp["error"]["message"], resp

resp = do_query_from_worksheet(client, "select * from numbers(10) ", new_session=True)
assert len(resp["data"]) == 2, resp

def main():
test_no_session()
Expand Down