Skip to content

Commit 2cd6361

Browse files
RocMarshalXComp
andcommitted
[FLINK-38896][runtime/rest] Introduce the /jobs/:jobid/rescales/summary endpoint in the REST API
Co-authored-by: XComp <mpohl@confluent.io>
1 parent 6b96484 commit 2cd6361

7 files changed

Lines changed: 612 additions & 0 deletions

File tree

docs/layouts/shortcodes/generated/rest_v1_dispatcher.html

Lines changed: 107 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4252,6 +4252,113 @@
42524252
</tr>
42534253
</tbody>
42544254
</table>
4255+
<table class="rest-api table table-bordered">
4256+
<tbody>
4257+
<tr>
4258+
<td class="text-left" colspan="2"><h5><strong>/jobs/:jobid/rescales/summary</strong></h5></td>
4259+
</tr>
4260+
<tr>
4261+
<td class="text-left" style="width: 20%">Verb: <code>GET</code></td>
4262+
<td class="text-left">Response code: <code>200 OK</code></td>
4263+
</tr>
4264+
<tr>
4265+
<td colspan="2">Return job rescales summary.</td>
4266+
</tr>
4267+
<tr>
4268+
<td colspan="2">Path parameters</td>
4269+
</tr>
4270+
<tr>
4271+
<td colspan="2">
4272+
<ul>
4273+
<li><code>jobid</code> - 32-character hexadecimal string value that identifies a job.</li>
4274+
</ul>
4275+
</td>
4276+
</tr>
4277+
<tr>
4278+
<td colspan="2">
4279+
<label>
4280+
<details>
4281+
<summary>Request</summary>
4282+
<pre><code>{}</code></pre>
4283+
</label>
4284+
</td>
4285+
</tr>
4286+
<tr>
4287+
<td colspan="2">
4288+
<label>
4289+
<details>
4290+
<summary>Response</summary>
4291+
<pre><code>{
4292+
"type" : "object",
4293+
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescalesSummary",
4294+
"properties" : {
4295+
"completedRescalesDurationStatsInMillis" : {
4296+
"type" : "object",
4297+
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto"
4298+
},
4299+
"failedRescalesDurationStatsInMillis" : {
4300+
"type" : "object",
4301+
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto"
4302+
},
4303+
"ignoredRescalesDurationStatsInMillis" : {
4304+
"type" : "object",
4305+
"$ref" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto"
4306+
},
4307+
"rescalesCounts" : {
4308+
"type" : "object",
4309+
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:rescales:JobRescalesOverview:RescalesCounts",
4310+
"properties" : {
4311+
"completed" : {
4312+
"type" : "integer"
4313+
},
4314+
"failed" : {
4315+
"type" : "integer"
4316+
},
4317+
"ignored" : {
4318+
"type" : "integer"
4319+
},
4320+
"inProgress" : {
4321+
"type" : "integer"
4322+
}
4323+
}
4324+
},
4325+
"rescalesDurationStatsInMillis" : {
4326+
"type" : "object",
4327+
"id" : "urn:jsonschema:org:apache:flink:runtime:rest:messages:util:stats:StatsSummaryDto",
4328+
"properties" : {
4329+
"avg" : {
4330+
"type" : "integer"
4331+
},
4332+
"max" : {
4333+
"type" : "integer"
4334+
},
4335+
"min" : {
4336+
"type" : "integer"
4337+
},
4338+
"p50" : {
4339+
"type" : "number"
4340+
},
4341+
"p90" : {
4342+
"type" : "number"
4343+
},
4344+
"p95" : {
4345+
"type" : "number"
4346+
},
4347+
"p99" : {
4348+
"type" : "number"
4349+
},
4350+
"p999" : {
4351+
"type" : "number"
4352+
}
4353+
}
4354+
}
4355+
}
4356+
}</code></pre>
4357+
</label>
4358+
</td>
4359+
</tr>
4360+
</tbody>
4361+
</table>
42554362
<table class="rest-api table table-bordered">
42564363
<tbody>
42574364
<tr>

docs/static/generated/rest_v1_dispatcher.yml

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -996,6 +996,24 @@ paths:
996996
application/json:
997997
schema:
998998
$ref: "#/components/schemas/JobRescalesOverview"
999+
/jobs/{jobid}/rescales/summary:
1000+
get:
1001+
description: Return job rescales summary.
1002+
operationId: getJobRescalesSummary
1003+
parameters:
1004+
- name: jobid
1005+
in: path
1006+
description: 32-character hexadecimal string value that identifies a job.
1007+
required: true
1008+
schema:
1009+
$ref: "#/components/schemas/JobID"
1010+
responses:
1011+
"200":
1012+
description: The request was successful.
1013+
content:
1014+
application/json:
1015+
schema:
1016+
$ref: "#/components/schemas/JobRescalesSummary"
9991017
/jobs/{jobid}/rescaling:
10001018
patch:
10011019
description: Triggers the rescaling of a job. This async operation would return
@@ -2875,6 +2893,19 @@ components:
28752893
$ref: "#/components/schemas/LatestRescales"
28762894
rescalesCounts:
28772895
$ref: "#/components/schemas/RescalesCounts"
2896+
JobRescalesSummary:
2897+
type: object
2898+
properties:
2899+
completedRescalesDurationStatsInMillis:
2900+
$ref: "#/components/schemas/StatsSummaryDto"
2901+
failedRescalesDurationStatsInMillis:
2902+
$ref: "#/components/schemas/StatsSummaryDto"
2903+
ignoredRescalesDurationStatsInMillis:
2904+
$ref: "#/components/schemas/StatsSummaryDto"
2905+
rescalesCounts:
2906+
$ref: "#/components/schemas/RescalesCounts"
2907+
rescalesDurationStatsInMillis:
2908+
$ref: "#/components/schemas/StatsSummaryDto"
28782909
JobResourceRequirementsBody:
28792910
type: object
28802911
additionalProperties:
Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.flink.runtime.rest.handler.job.rescales;
20+
21+
import org.apache.flink.api.common.JobID;
22+
import org.apache.flink.runtime.rest.handler.HandlerRequest;
23+
import org.apache.flink.runtime.rest.handler.RestHandlerException;
24+
import org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler;
25+
import org.apache.flink.runtime.rest.handler.legacy.ExecutionGraphCache;
26+
import org.apache.flink.runtime.rest.messages.EmptyRequestBody;
27+
import org.apache.flink.runtime.rest.messages.ErrorResponseBody;
28+
import org.apache.flink.runtime.rest.messages.JobIDPathParameter;
29+
import org.apache.flink.runtime.rest.messages.JobMessageParameters;
30+
import org.apache.flink.runtime.rest.messages.MessageHeaders;
31+
import org.apache.flink.runtime.rest.messages.ResponseBody;
32+
import org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesSummary;
33+
import org.apache.flink.runtime.rest.messages.job.rescales.JobRescalesSummaryHeaders;
34+
import org.apache.flink.runtime.scheduler.ExecutionGraphInfo;
35+
import org.apache.flink.runtime.scheduler.adaptive.timeline.RescalesStatsSnapshot;
36+
import org.apache.flink.runtime.webmonitor.RestfulGateway;
37+
import org.apache.flink.runtime.webmonitor.history.ArchivedJson;
38+
import org.apache.flink.runtime.webmonitor.history.JsonArchivist;
39+
import org.apache.flink.runtime.webmonitor.retriever.GatewayRetriever;
40+
41+
import org.apache.flink.shaded.netty4.io.netty.handler.codec.http.HttpResponseStatus;
42+
43+
import java.io.IOException;
44+
import java.time.Duration;
45+
import java.util.Collection;
46+
import java.util.List;
47+
import java.util.Map;
48+
import java.util.concurrent.Executor;
49+
50+
/** Handler to response job rescales summary. */
51+
public class JobRescalesSummaryHandler
52+
extends AbstractExecutionGraphHandler<JobRescalesSummary, JobMessageParameters>
53+
implements JsonArchivist {
54+
55+
public JobRescalesSummaryHandler(
56+
GatewayRetriever<? extends RestfulGateway> leaderRetriever,
57+
Duration timeout,
58+
Map<String, String> responseHeaders,
59+
MessageHeaders<EmptyRequestBody, JobRescalesSummary, JobMessageParameters>
60+
messageHeaders,
61+
ExecutionGraphCache executionGraphCache,
62+
Executor executor) {
63+
super(
64+
leaderRetriever,
65+
timeout,
66+
responseHeaders,
67+
messageHeaders,
68+
executionGraphCache,
69+
executor);
70+
}
71+
72+
@Override
73+
protected JobRescalesSummary handleRequest(
74+
HandlerRequest<EmptyRequestBody> request, ExecutionGraphInfo executionGraphInfo)
75+
throws RestHandlerException {
76+
return getJobRescalesSummary(executionGraphInfo);
77+
}
78+
79+
private JobRescalesSummary getJobRescalesSummary(ExecutionGraphInfo executionGraphInfo)
80+
throws RestHandlerException {
81+
RescalesStatsSnapshot rescalesStatsSnapshot = executionGraphInfo.getRescalesStatsSnapshot();
82+
JobID jobId = executionGraphInfo.getJobId();
83+
84+
if (rescalesStatsSnapshot == null) {
85+
throw new RestHandlerException(
86+
"AdaptiveScheduler rescales and max rescales history were both not enabled for job "
87+
+ jobId
88+
+ '.',
89+
HttpResponseStatus.NOT_FOUND);
90+
}
91+
92+
return JobRescalesSummary.fromRescalesStatsSnapshot(rescalesStatsSnapshot);
93+
}
94+
95+
@Override
96+
public Collection<ArchivedJson> archiveJsonWithPath(ExecutionGraphInfo executionGraphInfo)
97+
throws IOException {
98+
ResponseBody response;
99+
try {
100+
response = getJobRescalesSummary(executionGraphInfo);
101+
} catch (RestHandlerException rhe) {
102+
response = new ErrorResponseBody(rhe.getMessage());
103+
}
104+
return List.of(
105+
new ArchivedJson(
106+
JobRescalesSummaryHeaders.getInstance()
107+
.getTargetRestEndpointURL()
108+
.replace(
109+
':' + JobIDPathParameter.KEY,
110+
executionGraphInfo.getJobId().toString()),
111+
response));
112+
}
113+
}

0 commit comments

Comments
 (0)