-
Notifications
You must be signed in to change notification settings - Fork 5k
enh(interp): fill(prev/next/near/linear) is configured by ignoreNull param #34006
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: 3.0
Are you sure you want to change the base?
Conversation
|
/gemini review |
Summary of ChangesHello @Tony2h, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly improves the flexibility and efficiency of interpolation functions by introducing explicit null handling and a robust operator notification system. The changes allow interpolation queries to better manage null values and enable operators to communicate their processing progress, leading to optimized data fetching and reduced overhead in complex query scenarios. The integration spans message structures, executor logic, storage API, and various operator implementations, ensuring a cohesive enhancement to the system's data processing capabilities. Highlights
🧠 New Feature in Public Preview: You can now enable Memory to help Gemini Code Assist learn from your team's feedback. This makes future code reviews more consistent and personalized to your project's style. Click here to enable Memory in your admin console. Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a mechanism for the interp operator to notify downstream scan operators to optimize data fetching for fill operations. This is achieved by adding a new task notification type and associated serialization/deserialization logic. The changes span across messaging, executor, and storage layers. While the overall approach is sound, there are several critical memory leaks in the new message deserialization and handling logic that must be addressed. I've also found a potential logic bug in a condition check and some leftover debug code. Please review my comments for details.
| static int32_t tDeserializeSOperatorNotifyParam(SDecoder *pDecoder, SOperatorParam *pOpParam) { | ||
| TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pOpParam->opType)); | ||
| TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &pOpParam->downstreamIdx)); | ||
| switch (pOpParam->opType) { | ||
| case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: { | ||
| pOpParam->value = taosMemoryMalloc(sizeof(int64_t)); | ||
| if (NULL == pOpParam->value) { | ||
| TAOS_CHECK_RETURN(terrno); | ||
| } | ||
| TAOS_CHECK_RETURN(tDecodeI64(pDecoder, (int64_t*)pOpParam->value)); | ||
| break; | ||
| } | ||
| default: | ||
| return TSDB_CODE_INVALID_PARA; | ||
| } | ||
|
|
||
| int32_t childrenNum = 0; | ||
| TAOS_CHECK_RETURN(tDecodeI32(pDecoder, &childrenNum)); | ||
|
|
||
| if (childrenNum > 0) { | ||
| pOpParam->pChildren = taosArrayInit(childrenNum, POINTER_BYTES); | ||
| if (NULL == pOpParam->pChildren) { | ||
| TAOS_CHECK_RETURN(terrno); | ||
| } | ||
| for (int32_t i = 0; i < childrenNum; ++i) { | ||
| SOperatorParam *pChild = taosMemoryCalloc(1, sizeof(SOperatorParam)); | ||
| if (NULL == pChild) { | ||
| TAOS_CHECK_RETURN(terrno); | ||
| } | ||
| TAOS_CHECK_RETURN(tDeserializeSOperatorNotifyParam(pDecoder, pChild)); | ||
| if (taosArrayPush(pOpParam->pChildren, &pChild) == NULL) { | ||
| TAOS_CHECK_RETURN(terrno); | ||
| } | ||
| } | ||
| } else { | ||
| pOpParam->pChildren = NULL; | ||
| } | ||
|
|
||
| TAOS_CHECK_RETURN(tDecodeBool(pDecoder, &pOpParam->reUse)); | ||
|
|
||
| return TSDB_CODE_SUCCESS; | ||
| } |
This comment was marked as outdated.
This comment was marked as outdated.
Sorry, something went wrong.
| pReq->pOpParam = taosMemoryCalloc(1, sizeof(SOperatorParam)); | ||
| if (NULL == pReq->pOpParam) { | ||
| TAOS_CHECK_EXIT(terrno); | ||
| } | ||
| TAOS_CHECK_EXIT(tDeserializeSOperatorNotifyParam(&decoder, pReq->pOpParam)); |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
source/libs/qworker/src/qworker.c
Outdated
| case TASK_NOTIFY_STEP_DONE: { | ||
| if (ctx->taskHandle != NULL) { | ||
| QW_ERR_JRET( | ||
| notifyTableScanTask(ctx->taskHandle, | ||
| ((STaskNotifyReq*)qwMsg->msg)->pOpParam)); | ||
| } | ||
| break; | ||
| } |
This comment was marked as outdated.
This comment was marked as outdated.
Sorry, something went wrong.
|
|
||
| if (checkNullRow(&pOperator->exprSupp, pBlock, i, ignoreNull)) { | ||
| continue; | ||
| if (!notified && (ts < pSliceInfo->win.ekey || ts > pSliceInfo->win.skey)) { |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
| /* | ||
| @brief Mark the current step done, so next step will be triggered. | ||
| @param pReader the reader to mark the step done | ||
| @param notifyTs the timestamp to notify, used to determine whether |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
| // TODO(tony): for debug!!!!!!!!!!! | ||
| // QUERY_NODE_LOGIC_PLAN_INTERP_FUNC != nodeType(pNode->pParent) && |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request enhances the interpolation (interp) functionality for fill operations (prev/next/near/linear) to be configured by an ignoreNull parameter. The implementation introduces a notification mechanism that allows the time slice operator to notify downstream table scan operators when a step is complete, optimizing the prev/next scan behavior to avoid unnecessary data fetching.
Key changes include:
- Implementation of a step-done notification system for coordinating between interpolation and scan operators
- Refactoring of timestamp validation logic to properly handle null values during interpolation
- Extension of reader state management to track completion of prev/next/main scan steps
Reviewed changes
Copilot reviewed 22 out of 22 changed files in this pull request and generated 12 comments.
Show a summary per file
| File | Description |
|---|---|
| test/cases/09-DataQuerying/12-Interp/test_query_interp_fill.py | Adds test setup with new tables and test function for interpolation with null value handling |
| test/cases/09-DataQuerying/12-Interp/in/interp_fill_ignore_null.in | Test queries exercising fill(prev/next/linear) with various range and partition scenarios |
| test/cases/09-DataQuerying/12-Interp/ans/interp_fill_ignore_null.csv | Expected results showing proper null handling in interpolation |
| source/libs/executor/src/timesliceoperator.c | Core changes: refactored timestamp validation, added downstream notification logic, simplified loop structure |
| source/libs/executor/src/scanoperator.c | Implements notification handler for table scan operator to mark reader step done |
| source/libs/executor/src/exchangeoperator.c | Adds notification forwarding to remote sources via RPC messages |
| source/libs/executor/src/executor.c | Entry point for notifying table scan tasks from worker nodes |
| source/libs/executor/src/operator.c | Utility function to set notification callback on operators |
| source/libs/executor/src/executorInt.c | Parameter building and cleanup for notification messages |
| source/dnode/vnode/src/tsdb/tsdbRead2.c | Enhanced reader state machine with step tracking and notification handling |
| source/dnode/vnode/src/tsdb/tsdbReadUtil.h | Added EXTERNAL_ROWS_INIT and EXTERNAL_ROWS_DONE states, currentStepDone flag |
| source/dnode/vnode/src/vnd/vnodeApi.c | Registers reader step done callback in API structure |
| source/libs/qworker/src/qworker.c | Handles TASK_NOTIFY_STEP_DONE message type in worker |
| source/libs/qworker/src/qwMsg.c | Passes notify message payload to worker processing |
| source/common/src/msg/tmsg.c | Separates serialization for GET vs NOTIFY operator parameters |
| source/libs/planner/src/planSpliter.c | Temporarily comments out interp function split check (marked as debug code) |
| include/libs/executor/*.h | Function declarations for notification infrastructure |
| include/common/tmsg.h | Adds TASK_NOTIFY_STEP_DONE type and pOpParam field |
| include/libs/executor/storageapi.h | Adds tsdReaderStepDone callback to TsdReader API |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| continue; | ||
| } | ||
|
|
||
| // TODO(Tianyi Zhang): optimize localExec |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| if (!notified && (ts < pSliceInfo->win.ekey || ts > pSliceInfo->win.skey)) { | ||
| /* | ||
| When downstream contains table scan operator, it may do prev/next | ||
| scan to fill(prev/next/linear/near) the first/last row. To reduce | ||
| the overhead, notify it only once when the first valid row comes. | ||
| */ | ||
| code = timeSliceOptrNotifyDownstream(pOperator->pDownstream[0], ts); | ||
| QUERY_CHECK_CODE(code, lino, _end); | ||
| notified = true; |
This comment was marked as abuse.
This comment was marked as abuse.
Sorry, something went wrong.
| code = buildOperatorStepDoneNotifyParam(&pNotifyParam, | ||
| QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN, | ||
| *(int64_t*)param->value); | ||
| if (code != TSDB_CODE_SUCCESS || pNotifyParam == NULL) { | ||
| qError("%s exchange optr failed to build notify param for source %d, " | ||
| "since:%s", GET_TASKID(pTaskInfo), i, tstrerror(code)); | ||
| freeOperatorParam(pNotifyParam, OP_NOTIFY_PARAM); | ||
| continue; | ||
| } |
This comment was marked as abuse.
This comment was marked as abuse.
Sorry, something went wrong.
| @brief Mark the current step done, so next step will be triggered. | ||
| @param pReader the reader to mark the step done | ||
| @param notifyTs the timestamp to notify, used to determine whether | ||
| */ |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
| static bool isInvalidTimestamp(STimeSliceOperatorInfo* pSliceInfo, | ||
| int64_t currentTs, SColumnInfoData* pPkCol, | ||
| int32_t curIndex) { | ||
| if (currentTs > pSliceInfo->win.ekey) { | ||
| return false; | ||
| } | ||
| if (pSliceInfo->prevTsSet && currentTs <= pSliceInfo->prevKey.ts) { | ||
| /* | ||
| Input data of time slice operator must be ordered by | ||
| timestamp ascendingly, except the prev scan. | ||
| So prevTs should never be updated to equal or smaller timestamp. | ||
| */ | ||
| return true; | ||
| } |
This comment was marked as abuse.
This comment was marked as abuse.
Sorry, something went wrong.
|
|
||
| if (checkNullRow(&pOperator->exprSupp, pBlock, i, ignoreNull)) { | ||
| continue; | ||
| if (!notified && (ts < pSliceInfo->win.ekey || ts > pSliceInfo->win.skey)) { |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
| @param pDownOptr the downstream operator info | ||
| @param pInput the input parameter for the notify function | ||
| @return the code of the operation | ||
| */ | ||
| static int32_t timeSliceOptrNotifyDownstream(SOperatorInfo* pDownOptr, | ||
| int64_t notifyTs) { | ||
| int32_t code = TSDB_CODE_SUCCESS; | ||
| int32_t lino = 0; | ||
| SOperatorParam* pNotifyParam = NULL; | ||
| if (pDownOptr != NULL && pDownOptr->fpSet.notifyFn != NULL) { | ||
| code = buildOperatorStepDoneNotifyParam(&pNotifyParam, | ||
| pDownOptr->operatorType, notifyTs); | ||
| QUERY_CHECK_CODE(code, lino, _end); | ||
|
|
||
| code = pDownOptr->fpSet.notifyFn(pDownOptr, pNotifyParam); |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
| SSDataBlock* pResBlock = pSliceInfo->pRes; | ||
| SInterval* pInterval = &pSliceInfo->interval; | ||
| bool notified = false; | ||
| SOperatorParam* pNotifyParam = NULL; |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
| void* msg = taosMemoryCalloc(1, msgSize); | ||
| if (msg == NULL) { | ||
| qError("%s failed to alloc memory for notify msg, size:%d", | ||
| GET_TASKID(pTaskInfo), msgSize); | ||
| freeOperatorParam(pNotifyParam, OP_NOTIFY_PARAM); | ||
| continue; | ||
| } | ||
|
|
||
| msgSize = tSerializeSTaskNotifyReq(msg, msgSize, &req); | ||
| if (msgSize < 0) { | ||
| qError("%s failed to serialize notify req for source %d, size:%d", | ||
| GET_TASKID(pTaskInfo), i, msgSize); | ||
| taosMemoryFree(msg); | ||
| freeOperatorParam(pNotifyParam, OP_NOTIFY_PARAM); | ||
| continue; | ||
| } | ||
|
|
||
| // send notify msg to source | ||
| SMsgSendInfo* pMsgSendInfo = taosMemoryCalloc(1, sizeof(SMsgSendInfo)); | ||
| if (pMsgSendInfo == NULL) { | ||
| qError("%s failed to alloc memory for msg send info", | ||
| GET_TASKID(pTaskInfo)); | ||
| taosMemoryFree(msg); | ||
| freeOperatorParam(pNotifyParam, OP_NOTIFY_PARAM); | ||
| continue; | ||
| } | ||
|
|
||
| pMsgSendInfo->msgInfo.pData = msg; | ||
| pMsgSendInfo->msgInfo.len = msgSize; | ||
| pMsgSendInfo->msgType = TDMT_SCH_TASK_NOTIFY; | ||
| pMsgSendInfo->requestId = pTaskInfo->id.queryId; | ||
| pMsgSendInfo->fp = NULL; | ||
| pMsgSendInfo->param = NULL; | ||
| pMsgSendInfo->paramFreeFp = NULL; | ||
|
|
||
| int64_t transporterId = 0; | ||
| code = asyncSendMsgToServer(pExchangeInfo->pTransporter, | ||
| &pSource->addr.epSet, &transporterId, | ||
| pMsgSendInfo); | ||
| if (code != TSDB_CODE_SUCCESS) { | ||
| qError("%s failed to send notify msg to source %d (vgId:%d), code:%s", | ||
| GET_TASKID(pTaskInfo), i, pSource->addr.nodeId, tstrerror(code)); | ||
| taosMemoryFree(msg); | ||
| taosMemoryFree(pMsgSendInfo); | ||
| freeOperatorParam(pNotifyParam, OP_NOTIFY_PARAM); | ||
| continue; | ||
| } |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
|
|
||
| if (pReader->innerReader[0] != NULL && pReader->step == 0) { | ||
| if (pReader->innerReader[0] != NULL && | ||
| pReader->step == EXTERNAL_ROWS_PREV && |
This comment was marked as outdated.
This comment was marked as outdated.
Sorry, something went wrong.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces an enhancement for interp queries with fill by adding a notification mechanism to optimize scanning. This allows the timeslice operator to inform downstream scan operators to proceed to the next scanning phase, which is a good optimization. The changes are extensive, touching message serialization, various query operators, and the storage layer. While the overall approach is sound and new tests are included, I've identified several critical memory leaks in the new deserialization logic due to missing error handling. Additionally, there are some minor issues, including leftover debug code, an incomplete comment, and potential for unhelpful error messages due to incorrect line number reporting.
source/common/src/msg/tmsg.c
Outdated
| pOpParam->value = taosMemoryMalloc(sizeof(int64_t)); | ||
| if (NULL == pOpParam->value) { | ||
| TAOS_CHECK_RETURN(terrno); | ||
| } | ||
| TAOS_CHECK_RETURN(tDecodeI64(pDecoder, (int64_t*)pOpParam->value)); | ||
| break; |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
source/common/src/msg/tmsg.c
Outdated
| for (int32_t i = 0; i < childrenNum; ++i) { | ||
| SOperatorParam *pChild = taosMemoryCalloc(1, sizeof(SOperatorParam)); | ||
| if (NULL == pChild) { | ||
| TAOS_CHECK_RETURN(terrno); | ||
| } | ||
| TAOS_CHECK_RETURN(tDeserializeSOperatorNotifyParam(pDecoder, pChild)); | ||
| if (taosArrayPush(pOpParam->pChildren, &pChild) == NULL) { | ||
| TAOS_CHECK_RETURN(terrno); | ||
| } | ||
| } |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
| if (!tDecodeIsEnd(&decoder)) { | ||
| TAOS_CHECK_EXIT(tDecodeU64(&decoder, &pReq->clientId)); | ||
| } else { | ||
| pReq->clientId = 0; | ||
| pReq->pOpParam = taosMemoryCalloc(1, sizeof(SOperatorParam)); | ||
| if (NULL == pReq->pOpParam) { | ||
| TAOS_CHECK_EXIT(terrno); | ||
| } | ||
| TAOS_CHECK_EXIT(tDeserializeSOperatorNotifyParam(&decoder, pReq->pOpParam)); | ||
| } |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
| // TODO(tony): for debug!!!!!!!!!!! | ||
| // QUERY_NODE_LOGIC_PLAN_INTERP_FUNC != nodeType(pNode->pParent) && |
This comment was marked as duplicate.
This comment was marked as duplicate.
Sorry, something went wrong.
| /* | ||
| @brief Mark the current step done, so next step will be triggered. | ||
| @param pReader the reader to mark the step done | ||
| @param notifyTs the timestamp to notify, used to determine whether |
This comment was marked as duplicate.
This comment was marked as duplicate.
Sorry, something went wrong.
| int32_t code = tsdbAcquireReader(pReader); | ||
| TSDB_CHECK_CODE(code, lino, _end); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
| SOperatorParam* pParam = taosMemoryCalloc(1, sizeof(SOperatorParam)); | ||
| QUERY_CHECK_NULL(pParam, code, lino, _end, terrno); | ||
| pParam->opType = opType; | ||
| pParam->value = taosMemoryCalloc(1, sizeof(int64_t)); | ||
| QUERY_CHECK_NULL(pParam->value, code, lino, _end, terrno); |
This comment was marked as spam.
This comment was marked as spam.
Sorry, something went wrong.
2ba83ee to
79c81c1
Compare
| - RANGE: 指定单个或者开始结束时间值,结束时间须大于开始时间,ts_val 为标准时间戳类型,surrounding_time_val 可选,指定时间范围,为正值,精度可选 1n、1u、1a、1s、1m、1h、1d、1w。如 ```RANGE('2023-10-01T00:00:00.000')``` 、```RANGE('2023-10-01T00:00:00.000', '2023-10-01T23:59:59.999')```、```RANGE('2023-10-01T00:00:00.000', '2023-10-01T23:59:59.999',1h)```。 | ||
| - EVERY: 时间间隔范围,every_val 为正值,精度可选 1n、1u、1a、1s、1m、1h、1d、1w,如 EVERY(1s)。 | ||
| - FILL: 类型可选 NONE (不填充)、VALUE(指定值填充)、PREV(前一个非 NULL 值)、NEXT(后一个非 NULL)、NEAR(前后最近的非 NULL 值)。 | ||
| - FILL: 类型可选 NONE (不填充)、VALUE(指定值填充)、PREV(前一个有效值)、NEXT(后一个有效值)、NEAR(最近邻的有效值)、LINEAR(线性插值);注意,NULL值能否被认定为有效数据取决于 interp 函数的 ignore_null_values 参数。 |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
| ``` | ||
|
|
||
| **功能说明**:返回指定时间截面指定列的记录值或插值。ignore_null_values 参数的值可以是 0 或 1,为 1 时表示忽略 NULL 值,缺省值为 0。 | ||
| **功能说明**:返回指定时间截面指定列的记录值或插值。ignore_null_values 参数的值可以是 0 或 1,为 1 时表示忽略 NULL 值,缺省值为 0。当 ignore_null_values 为1时,插值时将会忽略其他 NULL 值采样数据。 |
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
This comment was marked as resolved.
This comment was marked as resolved.
Sorry, something went wrong.
2e97edd to
85862ef
Compare
| 1. 不进行填充:NONE(默认填充模式)。 | ||
| 2. VALUE 填充:固定值填充,此时需要指定填充的数值。例如 `FILL(VALUE, 1.23)`。这里需要注意,最终填充的值受由相应列的类型决定,如 `FILL(VALUE, 1.23)`,相应列为 INT 类型,则填充值为 1,若查询列表中有多列需要 FILL,则需要给每一个 FILL 列指定 VALUE,如 `SELECT _wstart, min(c1), max(c1) FROM ... FILL(VALUE, 0, 0)`,注意,SELECT 表达式中只有包含普通列时才需要指定 FILL VALUE,如 `_wstart`、`_wstart+1a`、`now`、`1+1` 以及使用 `partition by` 时的 `partition key` (如 tbname) 都不需要指定 VALUE,如 `timediff(last(ts), _wstart)` 则需要指定 VALUE。 | ||
| 3. PREV 填充:使用前一个值填充数据。例如 FILL(PREV)。 | ||
| 3. PREV 填充:使用前一个有效值填充数据。例如 FILL(PREV)。 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
在这里把有效值的含义也分别说一下吧
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed
| cond->type = TIMEWINDOW_RANGE_EXTERNAL; | ||
| code = getQueryExtWindow(&cond->twindows, &pInfo->win, &cond->twindows, cond->extTwindows); | ||
| QUERY_CHECK_CODE(code, lino, _error); | ||
| setOperatorNotifyFn(downstream, scanOptrNotifyReaderStepDone); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个应该在各个算子里自己设置自己的吧
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
notify方法可能会受上游算子的影响,不同的上游算子可能notify的内容不一样,所以在上游的interp算子里设置了。
但是tablescan算子里自己设置了,因为qnode和vnode分离的时候,interp算子无法为tablescan算子设置这个接口,以后如果还有其他的notify函数的话这会是个问题。
| " since:%s", __func__, lino, sourceIdx, tstrerror(code)); | ||
| } | ||
| taosMemoryFree(msg); | ||
| taosMemoryFree(pMsgSendInfo); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
需要确认这种用法有没有问题
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
有问题,已经修改为noResp形式的rpcMsg接口
| @param notifyTs the timestamp to notify, used to determine whether | ||
| to mark the current step as done | ||
| */ | ||
| int32_t tsdbReaderStepDone(STsdbReader* pReader, int64_t notifyTs) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
这个函数跟scan算子有可能在两个线程上执行,需要有并发访问控制
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
- 通过pReader内部的互斥锁控制query和notify在不同线程并发访问
- 在释放taskHandle和pReader前给taskCtx上写锁来避免use-after-free error
…mprove query structure
…cases for filling nulls
…n and optimize memory allocation
43f250a to
1a9c638
Compare
Description
Issue(s)
resolve: https://project.feishu.cn/taosdata_td/feature/detail/6506970855
Checklist
Please check the items in the checklist if applicable.