Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add a RetryInterval setting #1654

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions api/json-schema/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -19144,6 +19144,10 @@
"readTimeout": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration",
"description": "Read timeout for all the vertices in the pipeline, can be overridden by the vertex's limit settings"
},
"retryInterval": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration",
"description": "RetryInterval is the wait time before retrying a batch after getting an error from a user defined processor or ISBSVC."
}
},
"type": "object"
Expand Down Expand Up @@ -19832,6 +19836,10 @@
"readTimeout": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration",
"description": "Read timeout duration from the source or buffer It overrides the settings from pipeline limits."
},
"retryInterval": {
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration",
"description": "RetryInterval is the duration waited for before retrying after a UDF processing or buffer writing. It overrides the settings from the pipeline limits."
}
},
"type": "object"
Expand Down
8 changes: 8 additions & 0 deletions api/openapi-spec/swagger.json
Original file line number Diff line number Diff line change
Expand Up @@ -19131,6 +19131,10 @@
"readTimeout": {
"description": "Read timeout for all the vertices in the pipeline, can be overridden by the vertex's limit settings",
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
},
"retryInterval": {
"description": "RetryInterval is the wait time before retrying a batch after getting an error from a user defined processor or ISBSVC.",
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
}
}
},
Expand Down Expand Up @@ -19810,6 +19814,10 @@
"readTimeout": {
"description": "Read timeout duration from the source or buffer It overrides the settings from pipeline limits.",
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
},
"retryInterval": {
"description": "RetryInterval is the duration waited for before retrying after a UDF processing or buffer writing. It overrides the settings from the pipeline limits.",
"$ref": "#/definitions/io.k8s.apimachinery.pkg.apis.meta.v1.Duration"
}
}
},
Expand Down
5 changes: 5 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_pipelines.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,9 @@ spec:
readTimeout:
default: 1s
type: string
retryInterval:
default: 0.001s
type: string
type: object
sideInputs:
items:
Expand Down Expand Up @@ -5902,6 +5905,8 @@ spec:
type: integer
readTimeout:
type: string
retryInterval:
type: string
type: object
metadata:
properties:
Expand Down
10 changes: 10 additions & 0 deletions config/base/crds/full/numaflow.numaproj.io_vertices.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -679,6 +679,8 @@ spec:
type: integer
readTimeout:
type: string
retryInterval:
type: string
type: object
fromVertexPartitionCount:
format: int32
Expand All @@ -705,6 +707,8 @@ spec:
type: integer
readTimeout:
type: string
retryInterval:
type: string
type: object
toVertexPartitionCount:
format: int32
Expand Down Expand Up @@ -1548,6 +1552,8 @@ spec:
type: integer
readTimeout:
type: string
retryInterval:
type: string
type: object
metadata:
properties:
Expand Down Expand Up @@ -3756,6 +3762,8 @@ spec:
type: integer
readTimeout:
type: string
retryInterval:
type: string
type: object
fromVertexPartitionCount:
format: int32
Expand All @@ -3782,6 +3790,8 @@ spec:
type: integer
readTimeout:
type: string
retryInterval:
type: string
type: object
toVertexPartitionCount:
format: int32
Expand Down
15 changes: 15 additions & 0 deletions config/install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2749,6 +2749,9 @@ spec:
readTimeout:
default: 1s
type: string
retryInterval:
default: 0.001s
type: string
type: object
sideInputs:
items:
Expand Down Expand Up @@ -8511,6 +8514,8 @@ spec:
type: integer
readTimeout:
type: string
retryInterval:
type: string
type: object
metadata:
properties:
Expand Down Expand Up @@ -12488,6 +12493,8 @@ spec:
type: integer
readTimeout:
type: string
retryInterval:
type: string
type: object
fromVertexPartitionCount:
format: int32
Expand All @@ -12514,6 +12521,8 @@ spec:
type: integer
readTimeout:
type: string
retryInterval:
type: string
type: object
toVertexPartitionCount:
format: int32
Expand Down Expand Up @@ -13357,6 +13366,8 @@ spec:
type: integer
readTimeout:
type: string
retryInterval:
type: string
type: object
metadata:
properties:
Expand Down Expand Up @@ -15565,6 +15576,8 @@ spec:
type: integer
readTimeout:
type: string
retryInterval:
type: string
type: object
fromVertexPartitionCount:
format: int32
Expand All @@ -15591,6 +15604,8 @@ spec:
type: integer
readTimeout:
type: string
retryInterval:
type: string
type: object
toVertexPartitionCount:
format: int32
Expand Down
15 changes: 15 additions & 0 deletions config/namespace-install.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2749,6 +2749,9 @@ spec:
readTimeout:
default: 1s
type: string
retryInterval:
default: 0.001s
type: string
type: object
sideInputs:
items:
Expand Down Expand Up @@ -8511,6 +8514,8 @@ spec:
type: integer
readTimeout:
type: string
retryInterval:
type: string
type: object
metadata:
properties:
Expand Down Expand Up @@ -12488,6 +12493,8 @@ spec:
type: integer
readTimeout:
type: string
retryInterval:
type: string
type: object
fromVertexPartitionCount:
format: int32
Expand All @@ -12514,6 +12521,8 @@ spec:
type: integer
readTimeout:
type: string
retryInterval:
type: string
type: object
toVertexPartitionCount:
format: int32
Expand Down Expand Up @@ -13357,6 +13366,8 @@ spec:
type: integer
readTimeout:
type: string
retryInterval:
type: string
type: object
metadata:
properties:
Expand Down Expand Up @@ -15565,6 +15576,8 @@ spec:
type: integer
readTimeout:
type: string
retryInterval:
type: string
type: object
fromVertexPartitionCount:
format: int32
Expand All @@ -15591,6 +15604,8 @@ spec:
type: integer
readTimeout:
type: string
retryInterval:
type: string
type: object
toVertexPartitionCount:
format: int32
Expand Down
29 changes: 29 additions & 0 deletions docs/APIs.md
Original file line number Diff line number Diff line change
Expand Up @@ -3528,6 +3528,20 @@ the vertex’s limit settings
</p>
</td>
</tr>
<tr>
<td>
<code>retryInterval</code></br> <em>
<a href="https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Duration">
Kubernetes meta/v1.Duration </a> </em>
</td>
<td>
<em>(Optional)</em>
<p>
RetryInterval is the wait time before retrying a batch after getting an
error from a user defined processor or ISBSVC.
</p>
</td>
</tr>
</tbody>
</table>
<h3 id="numaflow.numaproj.io/v1alpha1.PipelinePhase">
Expand Down Expand Up @@ -5353,6 +5367,21 @@ overrides the settings from pipeline limits.
</p>
</td>
</tr>
<tr>
<td>
<code>retryInterval</code></br> <em>
<a href="https://pkg.go.dev/k8s.io/apimachinery/pkg/apis/meta/v1#Duration">
Kubernetes meta/v1.Duration </a> </em>
</td>
<td>
<em>(Optional)</em>
<p>
RetryInterval is the duration waited for before retrying after a UDF
processing or buffer writing. It overrides the settings from the
pipeline limits.
</p>
</td>
</tr>
</tbody>
</table>
<h3 id="numaflow.numaproj.io/v1alpha1.VertexPhase">
Expand Down
3 changes: 3 additions & 0 deletions docs/user-guide/reference/pipeline-tuning.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ processing cycle.
- `readBatchSize` - How many messages to read for each cycle, defaults to `500`.
- `bufferMaxLength` - How many unprocessed messages can be existing in the Inter-Step Buffer, defaults to `30000`.
- `bufferUsageLimit` - The percentage of the buffer usage limit, a valid number should be less than 100. Default value is `80`, which means `80%`.
- `retryInterval` - The time to wait before retrying after a failure of the UDF or of the ISBSVC.

These parameters can be customized under `spec.limits` as below, once defined, they apply to all the vertices and Inter-Step Buffers of the pipeline.

Expand All @@ -20,6 +21,7 @@ spec:
readBatchSize: 100
bufferMaxLength: 30000
bufferUsageLimit: 85
retryInterval: 0.05s
```

They also can be defined in a vertex level, which will override the pipeline level settings.
Expand All @@ -34,6 +36,7 @@ spec:
readBatchSize: 100
bufferMaxLength: 30000
bufferUsageLimit: 85
retryInterval: 0.05s
vertices:
- name: in
source:
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ require (
github.com/grpc-ecosystem/grpc-gateway v1.16.0
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/imdario/mergo v0.3.13
github.com/kylelemons/godebug v1.1.0
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe
github.com/nats-io/nats-server/v2 v2.10.4
github.com/nats-io/nats.go v1.33.1
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -585,6 +585,8 @@ github.com/kr/pty v1.1.5/go.mod h1:9r2w37qlBe7rQ6e1fg1S/9xpWHSnaqNdHD3WcMdbPDA=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/leodido/go-urn v1.2.1/go.mod h1:zt4jvISO2HfUBqxjfIshjdMTYS56ZS/qv49ictyFfxY=
github.com/leodido/go-urn v1.2.4 h1:XlAE/cm/ms7TE/VMVoduSpNBoyc2dOxHs5MZSwAN63Q=
github.com/leodido/go-urn v1.2.4/go.mod h1:7ZrI8mTSeBSHl/UaRyKQW1qZeMgak41ANeCNaVckg+4=
Expand Down
3 changes: 3 additions & 0 deletions pkg/apis/numaflow/v1alpha1/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,9 @@ const (
DefaultBufferUsageLimit = 0.8
DefaultReadBatchSize = 500

// Time to wait before a retry for datum batches on user defined processors or isbsvc errors.
DefaultRetryInterval = time.Millisecond

// Auto scaling
DefaultLookbackSeconds = 120 // Default lookback seconds for calculating avg rate and pending
DefaultCooldownSeconds = 90 // Default cooldown seconds after a scaling operation
Expand Down
Loading
Loading