Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[pull] master from sourcegraph:master #712

Open
wants to merge 6,305 commits into
base: master
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
6305 commits
Select commit Hold shift + click to select a range
7eb956e
Add alerts for mean db blocked seconds (#22822)
daxmc99 Jul 16, 2021
3329cdf
monitoring: migrate monitoring docs to Sourcegraph API Docs (#22876)
bobheadxi Jul 16, 2021
9d7cbea
Code insights: Implement delete insights with new dashboard functiona…
vovakulikov Jul 16, 2021
fda46f8
Docs: Update adding data to pings (#22909)
attfarhan Jul 16, 2021
d0742a2
Add documentation on repo table row-level security (#22746)
flying-robot Jul 16, 2021
6631e17
monitoring: PanelTypes API cleanup (#22879)
bobheadxi Jul 16, 2021
9f7ca9f
Add Instrumentation Error to FAQ (#22885)
abeatrix Jul 16, 2021
f641bd9
doc: symlink CHANGELOG.md to docs for better search results (#22918)
bobheadxi Jul 16, 2021
8a8c492
batches: fix the UI draft → UI published transition (#22906)
LawnGnome Jul 16, 2021
fd661df
batches: add a publish bulk action (#22908)
LawnGnome Jul 16, 2021
417e319
docs: Fix typos (#22933)
efritz Jul 16, 2021
b74c3b1
codeintel: Consider dependency graph for LSIF data retention (#22930)
efritz Jul 16, 2021
0688657
search: simplify pattern validation (#22938)
rvantonder Jul 16, 2021
6d5f537
search: lift ad-hoc pattern validation to parsing (#22939)
rvantonder Jul 16, 2021
4f44028
search: lift ad-hoc parameter validation to parsing (#22940)
rvantonder Jul 16, 2021
f1e6c61
search: factor out function for structural search path (#22943)
rvantonder Jul 16, 2021
ab51711
batches: add docs and changelog for publish from UI (#22935)
LawnGnome Jul 16, 2021
258ffc8
repos: Don't cancel context on unauth / forbidden / accountsuspended …
Jul 17, 2021
5db2712
chore: update third-party license (#22950)
sourcegraph-bot Jul 19, 2021
cdd47a0
web: move typography and button styles out from the `.theme-redesign`…
valerybugakov Jul 19, 2021
edab713
search: add RepoOptions to TextParameters (#22914)
stefanhengl Jul 19, 2021
e31288f
Attempt #3: upgrade to `Webpack` v5 (#22580)
valerybugakov Jul 19, 2021
c167754
EventLogger: Return sourcegraph.com hostname only for internal referr…
attfarhan Jul 19, 2021
ea4f259
executor: pass HOME/PATH to src-cli if Firecracker is disabled (#22911)
mrnugget Jul 19, 2021
a2cc571
web: change breader font-weight (#22953)
valerybugakov Jul 19, 2021
69cced7
web: change code monitoring form label margins (#22954)
valerybugakov Jul 19, 2021
6ee0a57
repos: Make streaming syncer the default (#22948)
Jul 19, 2021
04ed792
This message seems misleading (#22960)
Joelkw Jul 19, 2021
fb81c25
search: lower case status string for honeycomb (#22961)
keegancsmith Jul 19, 2021
3ea6a85
Code Insights: Tweak global scope error message (#22962)
Joelkw Jul 19, 2021
933b749
Add new type of dashboard initial state for the built in and settings…
vovakulikov Jul 19, 2021
ced98c1
search notebook: client infrastructure (#22828)
Jul 19, 2021
c3cef8c
search: return 40 repo filters and 40 other (#22863)
keegancsmith Jul 19, 2021
1d17d01
codeintel: temporarily use sourcegraph-shipped coursier musl binary (…
Strum355 Jul 19, 2021
170c3fa
Second attempt to PR "Add new "JVM Packages" external services type. …
olafurpg Jul 19, 2021
8727f06
Code Insights: Add feature flag the all repos insight mode (#22964)
vovakulikov Jul 19, 2021
a54107f
Trim spaces from license tags (#22838)
eseliger Jul 19, 2021
030c851
Add o3de search page (#22878)
unicodeveloper Jul 19, 2021
8bbd771
Bump src cli version and update cli docs (#22968)
eseliger Jul 19, 2021
bf4ea68
web: load runtime bundle in locally executed integration tests (#22969)
valerybugakov Jul 19, 2021
f153f2a
Code Insights: Fix BE insight loading card flashing (#22967)
vovakulikov Jul 19, 2021
a5d2625
sg: Add `sg rfc [list|search|open]` commands (#22927)
mrnugget Jul 19, 2021
28bf81d
dev(sg): add 'run redis-postgres' via compose (#22921)
bobheadxi Jul 19, 2021
a796052
update zoekt
keegancsmith Jul 16, 2021
0991e4f
internal/search: order results by repository priority (star count).
Jun 16, 2021
e105c27
extensibility: add alert for return of Firefox addon (#22126)
tjkandala Jul 19, 2021
4f8a8d9
extensibility: stabilize filtered extensions on change (#22928)
tjkandala Jul 19, 2021
e7469b7
Revert "extensibility: add alert for return of Firefox addon (#22126)…
tjkandala Jul 19, 2021
2808dfd
codeintel: Do not delete reachable uploads (#22737)
efritz Jul 19, 2021
55009b3
search: remove unused QueryInfo (#22966)
stefanhengl Jul 19, 2021
32945cd
search: Addressing search reference improvements after hallway testin…
fkling Jul 19, 2021
802b161
extensibility: fix browser extension for private code on Cloud (#22694)
tjkandala Jul 19, 2021
b0ecbb1
extensibility: add alert for return of Firefox addon (#22988)
tjkandala Jul 19, 2021
2d86f2e
search/backend: add config to limit reorder buffer size.
Jul 19, 2021
c81ce8e
Fix link to `sg` CLI README.md (#22888)
Kelvin-Lee Jul 19, 2021
c597246
remove references to non-root and merge with non-privilged (#22936)
davejrt Jul 19, 2021
c8b715d
fix webpack 5 for browser extension (#22991)
tjkandala Jul 19, 2021
fe3333b
docs: redesign homepage and update admin/install (#22965)
bobheadxi Jul 19, 2021
9714ddf
Few more quick copy changes (#23002)
Joelkw Jul 20, 2021
493b85d
Quick copy fix (#22946)
Joelkw Jul 20, 2021
8362f41
web(site-admin): disable instrumentation for non-kubernetes deploymen…
bobheadxi Jul 20, 2021
9733597
search notebook UI (#22829)
Jul 20, 2021
653f061
Remove the code insights page (#22987)
vovakulikov Jul 20, 2021
bf9f7d3
Code Insights: Clarify add and remove context menu item (#23005)
Joelkw Jul 20, 2021
1651e2c
Code Insights: Turn off homepage insights (#22989)
vovakulikov Jul 20, 2021
a2a2899
Code Insights: Fix insight dashboard memoization and add extension in…
vovakulikov Jul 20, 2021
d83dd7c
sg: remove `ulimit -n` from cmds, set on boot in `sg` (#23019)
mrnugget Jul 20, 2021
ca1190b
search notebook: always show run search action if no other blocks are…
Jul 20, 2021
d760043
Use correct Java version for JVM dependency repos. (#22874)
olafurpg Jul 20, 2021
d0d639b
batches: add filtered repo batch changes page and badge on repo page …
courier-new Jul 20, 2021
9149925
Update in-app changelog for 3.30 (#22971)
eseliger Jul 20, 2021
417216c
search: Add metrics to search reference (#22982)
fkling Jul 20, 2021
44876ea
Code Insights: Add explanation tooltip on the creation insight form. …
vovakulikov Jul 20, 2021
b1aba20
search: Improve search reference behavior for filters with suggestion…
fkling Jul 20, 2021
7b475da
repos: Don't panic if we find and unknown external service kind (#23024)
ryanslade Jul 20, 2021
6e51f45
security: check site admin before returning site usage stats (#23026)
unknwon Jul 20, 2021
cf4435b
monitoring: Refactor provisioning dashboards (#23006)
efritz Jul 20, 2021
f6b71d3
Code Insights: Add uniqBy check for the be insight gql handler (#23022)
vovakulikov Jul 20, 2021
74cc15d
remove redesign toggles from FileMatch.tsx (#22999)
camdencheek Jul 20, 2021
7ada503
extensibility: refresh extension details page (#22882)
tjkandala Jul 20, 2021
8f1310d
gomod: update zoekt to include meta file updates (#23034)
keegancsmith Jul 20, 2021
603d798
Change Add or remove insights copy (#23036)
felixfbecker Jul 20, 2021
3b14831
search: Remove search-reference feature flag (#22994)
fkling Jul 20, 2021
cad366c
Insights: migrate setting insights to database with a background job …
coury-clark Jul 20, 2021
a421f78
search: lift IsEmpty check higher up call stack 1/2 (#22944)
rvantonder Jul 20, 2021
32573c1
github: automatic repositoryQuery refinement (#22859)
Jul 20, 2021
aa00b87
Add web integration test suite to sg config (#21174)
eseliger Jul 20, 2021
fbc4152
Fix panic in error log (#23042)
eseliger Jul 20, 2021
78c7f96
search: lift IsEmpty check higher up call stack 2/2 (#22997)
rvantonder Jul 20, 2021
5526fcd
monitoring: correct metric for repo_index_state (#23041)
keegancsmith Jul 20, 2021
802482c
API docs: LSIF: add GraphQL backend for "Go to API docs" (#22902)
Jul 20, 2021
f7ac847
extensibility: only show firefox bext alert on firefox (#23048)
tjkandala Jul 20, 2021
0e934e5
flake: disable Puppeteer Search Feature Tour test (#23047)
courier-new Jul 20, 2021
396327b
search frontend: fix regex query highlighting for Firefox (#23043)
rvantonder Jul 20, 2021
cdf9fde
changelog: cut [email protected] (#23055)
daxmc99 Jul 21, 2021
2c450ed
Docs for code insights dashboards in 3.30 (#23004)
Joelkw Jul 21, 2021
784cc6f
Fix regexp toggle size (#23009)
Jul 21, 2021
03e3bec
search: Use query scanner to process/render search reference examples…
fkling Jul 21, 2021
a692cbd
GraphQL Client: Generate types for Apollo `typePolicies` (#22509)
umpox Jul 21, 2021
aaec123
GraphQL Client: Update GraphQL error types (#22721)
umpox Jul 21, 2021
5192c4c
Fix #22925 by removing needless database writes (#23061)
mrnugget Jul 21, 2021
7178aed
Add workspace volume mode to batch changes troubleshooting doc (#23068)
mrnugget Jul 21, 2021
ca1f115
Remove `nav-link` class to keep the insight search-button padding con…
vovakulikov Jul 21, 2021
be3315a
Remove BE-insight changelog line duplicate (#23064)
vovakulikov Jul 21, 2021
c708465
Make repo pages lazy loaded components (#23072)
eseliger Jul 21, 2021
aff392a
authz: Sync user permissions based on external service tokens (#23018)
ryanslade Jul 21, 2021
0f8f3c9
CHANGELOG: Fix typo
Jul 21, 2021
d7c3791
Guard against a nil pointer panic (#23033)
olafurpg Jul 21, 2021
795752a
Don't pass invalid arguments to ignite stop/remove (#22676)
eseliger Jul 21, 2021
dd40117
HubSpot: Send database ID on Cloud signups (#22951)
attfarhan Jul 21, 2021
346c35b
gomod: update zoekt to remove old meta files (#23075)
keegancsmith Jul 21, 2021
2df4b43
Implement auto-inference for Java package repositories. (#23076)
olafurpg Jul 21, 2021
2ca4437
Add JDK sources to "JVM Dependencies" external service type. (#23074)
olafurpg Jul 21, 2021
28e524c
Always pass HOME and PATH to commands run by executor (#23077)
eseliger Jul 21, 2021
326d250
tune down result limits in searcher (#23082)
camdencheek Jul 21, 2021
7afe9c5
extsvc: Add helper method to extract token (#23073)
ryanslade Jul 21, 2021
c942a2e
gqltest: minor tweak of README (#23062)
unknwon Jul 21, 2021
01575ee
sg: add ability to define `checks` per `commandset` (#23020)
mrnugget Jul 21, 2021
19ee94d
Fix feature tour integration tests (#23060)
Jul 21, 2021
070c110
Move heartbeats from executor-queue to executor (#22972)
eseliger Jul 21, 2021
6fa3751
flake: disable Quicklinks regression test (#23051)
courier-new Jul 21, 2021
1a3e7ad
auth: Redirect github oauth login attempt back if user cancels (#23083)
ryanslade Jul 21, 2021
2b6144a
Restrict importing from `enterprise` in OSS codebase (#23044)
courier-new Jul 21, 2021
7337d5b
Update CHANGELOG.md
andreeleuterio Jul 21, 2021
3d0aedd
insights: changing searcher limit to 20k (#23090)
coury-clark Jul 21, 2021
85112ba
Insights: migrate code insights from repo-updater to worker service (…
coury-clark Jul 21, 2021
f902e0a
search: lift IsEmpty check into doResults guards (#23057)
rvantonder Jul 21, 2021
9755572
search: perform IsEmpty check before resolving repos in suggestions (…
rvantonder Jul 21, 2021
6683052
code monitoring: remove w1-signup-optimisation feature flag (#23008)
Jul 21, 2021
47d7fd4
Update CHANGELOG (#20928)
daxmc99 Jul 21, 2021
4571fa4
docs: add 3.30 pure docker release notes (#23100)
ggilmore Jul 21, 2021
f4224ea
release: [email protected] (#23059)
daxmc99 Jul 21, 2021
07944fe
deployment_best_practices.md (#22907)
DaedalusG Jul 22, 2021
8b784e2
Update deployment_best_practices.md (#23107)
DaedalusG Jul 22, 2021
b377701
Fixing one more typo (#23108)
DaedalusG Jul 22, 2021
ec9f1a4
Fixing typos on apostrophes (#23109)
DaedalusG Jul 22, 2021
1a61249
Fix .yaml file link formatting (#23110)
DaedalusG Jul 22, 2021
265e183
add EventPathMatch to streaming API (#22996)
camdencheek Jul 22, 2021
d205147
Prepare for 3.31 release (#23111)
daxmc99 Jul 22, 2021
407908a
rename File to Content in stream events (#23035)
camdencheek Jul 22, 2021
414a6c2
repos: Delete batch syncer (#22949)
Jul 22, 2021
285fca7
search: Index 1M repos on dotcom (#23119)
Jul 22, 2021
30a36bc
monitoring: Tweak oldest sync alert (#23120)
ryanslade Jul 22, 2021
94f994b
Follow up on post-merge code style review (#23123)
olafurpg Jul 22, 2021
70504ca
repos: Log errors in SyncExternalService (#23127)
Jul 22, 2021
c255ca7
Removing reference to Bitbucket perms button (#23103)
emchap Jul 22, 2021
e807a2c
Stream ExecutionLogEntries from executor (#22884)
eseliger Jul 22, 2021
594200b
Don't limit maximum number of dequeued jobs (#23122)
eseliger Jul 22, 2021
b211823
codeintel: Add references to lsif-index-tester (#20036)
tjdevries Jul 22, 2021
11e7f45
Clarifying subdirectory handling for p4 (#23104)
emchap Jul 22, 2021
e12b322
httpcli: add retries to NewExternalHTTPClientFactory (#23131)
Jul 22, 2021
869ad1a
flake: disable Symbols sidebar regression test (#23099)
courier-new Jul 22, 2021
06335c2
search frontend: accurate hovers for multiline query inputs (#23114)
rvantonder Jul 22, 2021
ed51560
search: remove commit_pattern_info.go (#23097)
rvantonder Jul 22, 2021
420f4a8
search: ZoektGlobalSearch mode implied by non-empty query (#23093)
rvantonder Jul 22, 2021
b9044ef
search docs: notify @rvantonder (#23133)
rvantonder Jul 22, 2021
a85071c
Setup restricted db connection pool (#23078)
indradhanush Jul 22, 2021
bcfd067
search: empty patterns imply skipping content/file search (#23094)
rvantonder Jul 22, 2021
2b1c33d
API docs: LSIF: add references/definitions to tree API, not just blob…
Jul 22, 2021
ee7d98a
add a nil check for include patterns (#23138)
camdencheek Jul 22, 2021
099c441
search: remove text_pattern_info.go (#23095)
rvantonder Jul 22, 2021
5893fdc
search: remove excessive duplication for repoHasFile evaluation (#23105)
rvantonder Jul 22, 2021
be12b7b
Update config for 3.31.1 patch release (#23142)
daxmc99 Jul 22, 2021
f443809
batches: don't show batch changes on sourcegraph.com (#23052)
courier-new Jul 22, 2021
8d299cd
insights: introduced query cost and priority to distribute resources …
coury-clark Jul 22, 2021
4f151db
docs: fix typo, bullet formatting on pings doc (#23148)
courier-new Jul 22, 2021
1e9e64e
exclude all directories named "enterprise" from apache license (#23150)
tammy-zhu Jul 23, 2021
9fd8185
dbconn: subscribe to codenotify (#23155)
unknwon Jul 23, 2021
6cec69f
search notebook: render Markdown blobs (#23071)
Jul 23, 2021
3ba0b6c
Guard against a nil pointer dereference (#23124)
olafurpg Jul 23, 2021
5b758c1
sg: add run-set for core app (#23156)
unknwon Jul 23, 2021
230e922
GraphQL Client: Add `useConnection` hook to replace `<FilteredConnect…
umpox Jul 23, 2021
8e75ff3
Use custom postion calculator for the dashboard page contex-menu popo…
vovakulikov Jul 23, 2021
c12d400
database: Increase indexable repos threshold
tsenart Jul 23, 2021
60c2a42
search notebook: improve query block loading state (#23162)
Jul 23, 2021
7b4869a
docs: reorganize docker-compose docs, clean up admin homepage (#23029)
bobheadxi Jul 23, 2021
8aeec8c
security: Instrument access granted for private repos (#23164)
ryanslade Jul 23, 2021
e2d1415
repos: Publish Diff after committing transaction (#23169)
Jul 23, 2021
e834ccd
monitoring: Add helpers to make panels and groups for common metric p…
efritz Jul 23, 2021
759716f
codeintel: Repair broken metrics and standardize differing metrics (#…
efritz Jul 23, 2021
3de7bcd
repos: Make syncer more conservative when deleting repos (#23171)
Jul 23, 2021
d5c98a5
docs: add redirects for docker-compose upgrade/configure pages (#23173)
bobheadxi Jul 23, 2021
2d57233
httpcli: Only log retries (#23176)
Jul 23, 2021
ab6d1c6
fix vuln in libgcrypt (#23174)
davejrt Jul 23, 2021
14a3c03
Add CHANGELOG entry for libgcrypt (#23178)
daxmc99 Jul 23, 2021
208c3e1
Update CHANGELOG
tsenart Jul 23, 2021
addd495
adding note about disabled code insights on dot-com (#23177)
coury-clark Jul 23, 2021
b2a1397
add note about non-root overlays (#23179)
davejrt Jul 23, 2021
6032c83
repos: Add DISABLE_STATUS_MESSAGES_REPOS_COUNTING env var
tsenart Jul 23, 2021
3d08088
search: remove `w0-signup-optimisation` feature flag (#23037)
Jul 23, 2021
e99c2dc
search: lift up empty pattern check for symbols (#23149)
rvantonder Jul 23, 2021
6b9dcc6
monitoring: Fix sloppy field usage (#23185)
efritz Jul 23, 2021
0ccbfeb
codeintel: Fix inconsistent metric names (#23184)
efritz Jul 23, 2021
db8f540
search: move zoekt query conversion function to search package (#23153)
rvantonder Jul 23, 2021
f692c6a
Add warning about 3.30 to CHANGELOG (#23183)
daxmc99 Jul 23, 2021
807643e
codeintel: Update settings UI (#23106)
efritz Jul 23, 2021
e19a345
search: promote zoekt query conversion (#23154)
rvantonder Jul 23, 2021
18f04f9
Clarify use case for doc (#23181)
northyg Jul 23, 2021
f3a1f90
release: [email protected] (#23151)
daxmc99 Jul 23, 2021
b6fc13a
Update release-config (#23196)
daxmc99 Jul 23, 2021
0db8621
Remove previous postgres images (#23102)
daxmc99 Jul 23, 2021
96bb87a
monitoring: Update dashboards (#23091)
efritz Jul 25, 2021
16586b1
chore: update third-party license (#23215)
sourcegraph-bot Jul 26, 2021
81bae56
chore: fix typo in third-party-licenses commit messages (#23216)
bobheadxi Jul 26, 2021
a0bb6e9
sg: Move everything Command/exec.Cmd related to run package (#23175)
mrnugget Jul 26, 2021
f010159
CI: bump libxml2 and redis (#23221)
stefanhengl Jul 26, 2021
434f470
search: Add operators to search reference (#23116)
fkling Jul 26, 2021
dbb105c
Cleanup executor queue and move state to DB (#23165)
eseliger Jul 26, 2021
1c68e06
extensibility: use sideloaded settings schema over remote (#23207)
tjkandala Jul 26, 2021
9ea347b
search: Fix search reference filter filter (#23224)
fkling Jul 26, 2021
0aafe84
search: split global indexed search in public/private queries (#23118)
stefanhengl Jul 26, 2021
b867a79
Continuously update executor log entries instead of writing at end (#…
mrnugget Jul 26, 2021
0997c34
fix: Add missing column on lsif_uploads (#23228)
efritz Jul 26, 2021
ca7aad9
Remove access token option in connecting with code hosts (#23227)
Jul 26, 2021
16adb15
codeintel: Relax stalled max age (#23234)
efritz Jul 26, 2021
cf6991b
Do bulk heartbeats in every worker (#23230)
eseliger Jul 26, 2021
61148fb
search: move zoekt query conversion test (#23198)
rvantonder Jul 26, 2021
78ac3db
codeintel: Rate limit auto-indexing requests to repo-updater/gitserve…
efritz Jul 26, 2021
a2f368c
fix: Fix panic with log15 call (#23245)
efritz Jul 26, 2021
124ba7a
saved searches: convert form to function component and add storybook …
Jul 26, 2021
1a19d3c
extensibility: fix GitHub and GitLab hover overlay styles (#23206)
tjkandala Jul 26, 2021
d9fc33a
added search limit (#23197)
Joelkw Jul 26, 2021
e6fbc8e
dev(release): rename campaigns to batch changes (#23200)
bobheadxi Jul 27, 2021
6e24dfb
insights: added graphQL arguments to filter an insights query (#23256)
coury-clark Jul 27, 2021
7230f90
Fix search reference changelog entry (#23261)
fkling Jul 27, 2021
d362f5c
security: Log AccessGranted events when repo accessed directly (#23168)
ryanslade Jul 27, 2021
ab83f9d
search: Remove superfluous console.log statement (#23263)
fkling Jul 27, 2021
07977dd
Refine error handling for JVM dependencies that fail to resolve. (#23…
olafurpg Jul 27, 2021
3ddddc2
Fuzzy finder: don't force URL to use git SHA revision (#22133)
olafurpg Jul 27, 2021
eea18b7
output: properly reset StatusBar in Resetf (#23266)
mrnugget Jul 27, 2021
4126506
Remove unnecessary log message (#23267)
eseliger Jul 27, 2021
4bab7d6
Add namespace selector for create batch spec execution page (#22934)
eseliger Jul 27, 2021
713819a
Incorporate executor-queue into frontend server (#23239)
eseliger Jul 27, 2021
d062eee
metrics: Fix a possible divide by zero error in sql statement (#23269)
ryanslade Jul 27, 2021
77cc501
Typo in unfinished_migrations.md (#23254)
Kelvin-Lee Jul 27, 2021
9c1dd35
Attempting full explanation of select feature (#23250)
emchap Jul 27, 2021
3266f37
monitoring: Also show aggregate metrics if split via By (#23237)
efritz Jul 27, 2021
88f2a40
search: simplify repo rev callback resolution (#23242)
rvantonder Jul 27, 2021
38343de
search: pass file limit to zoekt search options (#23253)
rvantonder Jul 27, 2021
d2b78b1
Update CHANGELOG for Graceful termination periods (#23278)
daxmc99 Jul 27, 2021
b41d042
Insights: fetch backend insights from database instead of directly fr…
coury-clark Jul 27, 2021
78084d4
Add warning on 3.30 upgrade (#23290)
daxmc99 Jul 27, 2021
eb85c0f
batches: restore global nav item on sourcegraph.com (#23259)
courier-new Jul 28, 2021
c4f3c59
web: improve code block border consistency (#23262)
courier-new Jul 28, 2021
79e0d41
symbols: update go-sqlite3 v1.14.8 (#23282)
keegancsmith Jul 28, 2021
6a647c7
gitserver: use os.Remove in testRepoCorrupter (#23283)
keegancsmith Jul 28, 2021
6f4d327
search: don't surface ctx error from repo resolution
stefanhengl Jul 28, 2021
ffa3935
CONTRIBUTING: mention contributing to enterprise code, fix CoC link, …
Apr 12, 2022
9139f5d
correct the code of conduct link to the Community Code of Conduct
tammy-zhu Apr 12, 2022
ee4f6d6
Fix GitLab project name with new navigation enabled (#53532)
iamphill Aug 30, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Cleanup executor queue and move state to DB (sourcegraph#23165)
This PR updates the executor queue to do not hold state in-memory and removes now redundant cleanup logic. We don't need to manually keep track of records anymore, as we now have proper heartbeats coming from the executor itself. So we could get rid of most of the logic and just rely on the resetter now. Additionally, we now have one handler per queue now which makes it easier to track records as their ID is unique now (wasn't across queues before). That way, using the executorName as the worker_hostname, we can get all the relevant information from the DB and don't need to hold the state in-memory. This also extends the dbworker.Store by additional options to validate the worker is still the correct owner of the record (we did that before checking the in-memory map).
eseliger authored Jul 26, 2021

Verified

This commit was created on GitHub.com and signed with GitHub’s verified signature.
commit dbb105c6c5fd723263ce7ec72e10a28686a13563
21 changes: 4 additions & 17 deletions enterprise/cmd/executor-queue/config.go
Original file line number Diff line number Diff line change
@@ -1,35 +1,22 @@
package main

import (
"time"

apiserver "github.com/sourcegraph/sourcegraph/enterprise/cmd/executor-queue/internal/server"
"github.com/sourcegraph/sourcegraph/internal/env"
)

type Config struct {
env.BaseConfig

Port int
JobRequeueDelay time.Duration
JobCleanupInterval time.Duration
MaximumNumMissedHeartbeats int
Port int
}

func (c *Config) Load() {
c.Port = c.GetInt("EXECUTOR_QUEUE_API_PORT", "3191", "The port to listen on.")
c.JobRequeueDelay = c.GetInterval("EXECUTOR_QUEUE_JOB_REQUEUE_DELAY", "1m", "The requeue delay of jobs assigned to an unreachable executor.")
c.JobCleanupInterval = c.GetInterval("EXECUTOR_QUEUE_JOB_CLEANUP_INTERVAL", "10s", "Interval between cleanup runs.")
c.MaximumNumMissedHeartbeats = c.GetInt("EXECUTOR_QUEUE_MAXIMUM_NUM_MISSED_HEARTBEATS", "5", "The number of heartbeats an executor must miss to be considered unreachable.")
}

func (c *Config) ServerOptions(queueOptions map[string]apiserver.QueueOptions) apiserver.Options {
return apiserver.Options{
Port: c.Port,
QueueOptions: queueOptions,
RequeueDelay: c.JobRequeueDelay,
UnreportedMaxAge: c.JobCleanupInterval * time.Duration(c.MaximumNumMissedHeartbeats),
DeathThreshold: c.JobCleanupInterval * time.Duration(c.MaximumNumMissedHeartbeats),
CleanupInterval: c.JobCleanupInterval,
func (c *Config) ServerOptions() apiserver.ServerOptions {
return apiserver.ServerOptions{
Port: c.Port,
}
}
244 changes: 47 additions & 197 deletions enterprise/cmd/executor-queue/internal/server/handler.go
Original file line number Diff line number Diff line change
@@ -3,50 +3,17 @@ package server
import (
"context"
"fmt"
"sync"
"time"

"github.com/cockroachdb/errors"
"github.com/derision-test/glock"
"github.com/inconshreveable/log15"

apiclient "github.com/sourcegraph/sourcegraph/enterprise/internal/executor"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
"github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
)

type handler struct {
options Options
clock glock.Clock
executors map[string]*executorMeta
m sync.Mutex // protects executors
queueMetrics *QueueMetrics
}

type Options struct {
// Port is the port on which to listen for HTTP connections.
Port int

// QueueOptions is a map from queue name to options specific to that queue.
QueueOptions map[string]QueueOptions

// RequeueDelay controls how far into the future to make a job record visible to the job
// queue once the currently processing executor has become unresponsive.
RequeueDelay time.Duration

// UnreportedMaxAge is the maximum time between a record being dequeued and it appearing
// in the executor's heartbeat requests before it being considered lost.
UnreportedMaxAge time.Duration

// DeathThreshold is the minimum time since the last heartbeat of an executor before that
// executor can be considered as unresponsive. This should be configured to be longer than
// the duration between heartbeat interval.
DeathThreshold time.Duration

// CleanupInterval is the duration between periodic invocations of Cleanup, which will
// requeue any records that are "lost" according to the thresholds described above.
CleanupInterval time.Duration
QueueOptions
}

type QueueOptions struct {
@@ -58,206 +25,89 @@ type QueueOptions struct {
RecordTransformer func(ctx context.Context, record workerutil.Record) (apiclient.Job, error)
}

type executorMeta struct {
lastUpdate time.Time
jobs []jobMeta
}

type jobMeta struct {
queueName string
record workerutil.Record
started time.Time
}

func newHandler(options Options, clock glock.Clock) *handler {
return newHandlerWithMetrics(options, clock, &observation.TestContext)
}

func newHandlerWithMetrics(options Options, clock glock.Clock, observationContext *observation.Context) *handler {
func newHandler(queueOptions QueueOptions) *handler {
return &handler{
options: options,
clock: clock,
executors: map[string]*executorMeta{},
queueMetrics: newQueueMetrics(observationContext),
QueueOptions: queueOptions,
}
}

var (
ErrUnknownQueue = errors.New("unknown queue")
ErrUnknownJob = errors.New("unknown job")
)
var ErrUnknownJob = errors.New("unknown job")

// dequeue selects a job record from the database and stashes metadata including
// the job record and the locking transaction. If no job is available for processing,
// or the server has hit its maximum transactions, a false-valued flag is returned.
func (m *handler) dequeue(ctx context.Context, queueName, executorName, executorHostname string) (_ apiclient.Job, dequeued bool, _ error) {
queueOptions, ok := m.options.QueueOptions[queueName]
if !ok {
return apiclient.Job{}, false, ErrUnknownQueue
}

record, dequeued, err := queueOptions.Store.Dequeue(context.Background(), executorHostname, nil)
// a false-valued flag is returned.
func (h *handler) dequeue(ctx context.Context, executorName, executorHostname string) (_ apiclient.Job, dequeued bool, _ error) {
// We explicitly DON'T want to use executorHostname here, it is NOT guaranteed to be unique.
record, dequeued, err := h.Store.Dequeue(ctx, executorName, nil)
if err != nil {
return apiclient.Job{}, false, err
}
if !dequeued {
return apiclient.Job{}, false, nil
}

job, err := queueOptions.RecordTransformer(ctx, record)
job, err := h.RecordTransformer(ctx, record)
if err != nil {
if _, err := queueOptions.Store.MarkFailed(ctx, record.RecordID(), fmt.Sprintf("failed to transform record: %s", err)); err != nil {
if _, err := h.Store.MarkFailed(ctx, record.RecordID(), fmt.Sprintf("failed to transform record: %s", err), store.MarkFinalOptions{}); err != nil {
log15.Error("Failed to mark record as failed", "recordID", record.RecordID(), "error", err)
}

return apiclient.Job{}, false, err
}

now := m.clock.Now()
m.addMeta(executorName, jobMeta{queueName: queueName, record: record, started: now})
return job, true, nil
}

// addExecutionLogEntry calls AddExecutionLogEntry for the given job. If the job identifier
// is not known, a false-valued flag is returned.
func (m *handler) addExecutionLogEntry(ctx context.Context, queueName, executorName string, jobID int, entry workerutil.ExecutionLogEntry) error {
queueOptions, ok := m.options.QueueOptions[queueName]
if !ok {
return ErrUnknownQueue
}

_, err := m.findMeta(queueName, executorName, jobID, false)
if err != nil {
return err
}

if err := queueOptions.Store.AddExecutionLogEntry(ctx, jobID, entry); err != nil {
return err
}

return nil
// addExecutionLogEntry calls AddExecutionLogEntry for the given job.
func (h *handler) addExecutionLogEntry(ctx context.Context, executorName string, jobID int, entry workerutil.ExecutionLogEntry) error {
return h.Store.AddExecutionLogEntry(ctx, jobID, entry, store.AddExecutionLogEntryOptions{
// We pass the WorkerHostname, so the store enforces the record to be owned by this executor. When
// the previous executor didn't report heartbeats anymore, but is still alive and reporting logs,
// both executors that ever got the job would be writing to the same record. This prevents it.
WorkerHostname: executorName,
// We pass state to enforce adding log entries is only possible while the record is still dequeued.
State: "processing",
})
}

// markComplete calls MarkComplete for the given job, then commits the job's transaction.
// The job is removed from the executor's job list on success.
func (m *handler) markComplete(ctx context.Context, queueName, executorName string, jobID int) error {
queueOptions, ok := m.options.QueueOptions[queueName]
// markComplete calls MarkComplete for the given job.
func (h *handler) markComplete(ctx context.Context, executorName string, jobID int) error {
ok, err := h.Store.MarkComplete(ctx, jobID, store.MarkFinalOptions{
// We pass the WorkerHostname, so the store enforces the record to be owned by this executor. When
// the previous executor didn't report heartbeats anymore, but is still alive and reporting state,
// both executors that ever got the job would be writing to the same record. This prevents it.
WorkerHostname: executorName,
})
if !ok {
return ErrUnknownQueue
}

job, err := m.findMeta(queueName, executorName, jobID, true)
if err != nil {
return err
return ErrUnknownJob
}

_, err = queueOptions.Store.MarkComplete(ctx, job.record.RecordID())
return err
}

// markErrored calls MarkErrored for the given job, then commits the job's transaction.
// The job is removed from the executor's job list on success.
func (m *handler) markErrored(ctx context.Context, queueName, executorName string, jobID int, errorMessage string) error {
queueOptions, ok := m.options.QueueOptions[queueName]
// markErrored calls MarkErrored for the given job.
func (h *handler) markErrored(ctx context.Context, executorName string, jobID int, errorMessage string) error {
ok, err := h.Store.MarkErrored(ctx, jobID, errorMessage, store.MarkFinalOptions{
// We pass the WorkerHostname, so the store enforces the record to be owned by this executor. When
// the previous executor didn't report heartbeats anymore, but is still alive and reporting state,
// both executors that ever got the job would be writing to the same record. This prevents it.
WorkerHostname: executorName,
})
if !ok {
return ErrUnknownQueue
}

job, err := m.findMeta(queueName, executorName, jobID, true)
if err != nil {
return err
return ErrUnknownJob
}

_, err = queueOptions.Store.MarkErrored(ctx, job.record.RecordID(), errorMessage)
return err
}

// markFailed calls MarkFailed for the given job, then commits the job's transaction.
// The job is removed from the executor's job list on success.
func (m *handler) markFailed(ctx context.Context, queueName, executorName string, jobID int, errorMessage string) error {
queueOptions, ok := m.options.QueueOptions[queueName]
// markFailed calls MarkFailed for the given job.
func (h *handler) markFailed(ctx context.Context, executorName string, jobID int, errorMessage string) error {
ok, err := h.Store.MarkFailed(ctx, jobID, errorMessage, store.MarkFinalOptions{
// We pass the WorkerHostname, so the store enforces the record to be owned by this executor. When
// the previous executor didn't report heartbeats anymore, but is still alive and reporting state,
// both executors that ever got the job would be writing to the same record. This prevents it.
WorkerHostname: executorName,
})
if !ok {
return ErrUnknownQueue
}

job, err := m.findMeta(queueName, executorName, jobID, true)
if err != nil {
return err
return ErrUnknownJob
}

_, err = queueOptions.Store.MarkFailed(ctx, job.record.RecordID(), errorMessage)
return err
}

// findMeta returns the job with the given id and executor name. If the job is
// unknown, an error is returned. If the remove parameter is true, the job will
// be removed from the executor's job list on success.
func (m *handler) findMeta(queueName, executorName string, jobID int, remove bool) (jobMeta, error) {
m.m.Lock()
defer m.m.Unlock()

executor, ok := m.executors[executorName]
if !ok {
return jobMeta{}, ErrUnknownJob
}

for i, job := range executor.jobs {
if job.queueName == queueName && job.record.RecordID() == jobID {
if remove {
l := len(executor.jobs) - 1
executor.jobs[i] = executor.jobs[l]
executor.jobs = executor.jobs[:l]
m.updateMetrics()
}

return job, nil
}
}

return jobMeta{}, ErrUnknownJob
}

// addMeta adds a job to the given executor's job list.
func (m *handler) addMeta(executorName string, job jobMeta) {
m.m.Lock()
defer m.m.Unlock()

executor, ok := m.executors[executorName]
if !ok {
executor = &executorMeta{}
m.executors[executorName] = executor
}

now := m.clock.Now()
executor.jobs = append(executor.jobs, job)
executor.lastUpdate = now
m.updateMetrics()
}

func (m *handler) updateMetrics() {
type queueStat struct {
JobIDs []int
ExecutorNames map[string]struct{}
}
queueStats := map[string]queueStat{}

for executorName, meta := range m.executors {
for _, job := range meta.jobs {
stat, ok := queueStats[job.queueName]
if !ok {
stat = queueStat{
ExecutorNames: map[string]struct{}{},
}
}

stat.JobIDs = append(stat.JobIDs, job.record.RecordID())
stat.ExecutorNames[executorName] = struct{}{}
queueStats[job.queueName] = stat
}
}

for queueName, temp := range queueStats {
m.queueMetrics.NumJobs.WithLabelValues(queueName).Set(float64(len(temp.JobIDs)))
m.queueMetrics.NumExecutors.WithLabelValues(queueName).Set(float64(len(temp.ExecutorNames)))
}
}
149 changes: 41 additions & 108 deletions enterprise/cmd/executor-queue/internal/server/handler_test.go
Original file line number Diff line number Diff line change
@@ -4,7 +4,6 @@ import (
"context"
"testing"

"github.com/derision-test/glock"
"github.com/google/go-cmp/cmp"

apiclient "github.com/sourcegraph/sourcegraph/enterprise/internal/executor"
@@ -35,14 +34,9 @@ func TestDequeue(t *testing.T) {
return transformedJob, nil
}

options := Options{
QueueOptions: map[string]QueueOptions{
"test_queue": {Store: store, RecordTransformer: recordTransformer},
},
}
handler := newHandler(options, glock.NewMockClock())
handler := newHandler(QueueOptions{Store: store, RecordTransformer: recordTransformer})

job, dequeued, err := handler.dequeue(context.Background(), "test_queue", "deadbeef", "test")
job, dequeued, err := handler.dequeue(context.Background(), "deadbeef", "test")
if err != nil {
t.Fatalf("unexpected error dequeueing job: %s", err)
}
@@ -58,14 +52,9 @@ func TestDequeue(t *testing.T) {
}

func TestDequeueNoRecord(t *testing.T) {
options := Options{
QueueOptions: map[string]QueueOptions{
"test_queue": {Store: workerstoremocks.NewMockStore()},
},
}
handler := newHandler(options, glock.NewMockClock())
handler := newHandler(QueueOptions{Store: workerstoremocks.NewMockStore()})

_, dequeued, err := handler.dequeue(context.Background(), "test_queue", "deadbeef", "test")
_, dequeued, err := handler.dequeue(context.Background(), "deadbeef", "test")
if err != nil {
t.Fatalf("unexpected error dequeueing job: %s", err)
}
@@ -74,30 +63,16 @@ func TestDequeueNoRecord(t *testing.T) {
}
}

func TestDequeueUnknownQueue(t *testing.T) {
options := Options{}
handler := newHandler(options, glock.NewMockClock())

if _, _, err := handler.dequeue(context.Background(), "test_queue", "deadbeef", "test"); err != ErrUnknownQueue {
t.Fatalf("unexpected error. want=%q have=%q", ErrUnknownQueue, err)
}
}

func TestAddExecutionLogEntry(t *testing.T) {
store := workerstoremocks.NewMockStore()
store.DequeueFunc.SetDefaultReturn(testRecord{ID: 42}, true, nil)
recordTransformer := func(ctx context.Context, record workerutil.Record) (apiclient.Job, error) {
return apiclient.Job{ID: 42}, nil
}

options := Options{
QueueOptions: map[string]QueueOptions{
"test_queue": {Store: store, RecordTransformer: recordTransformer},
},
}
handler := newHandler(options, glock.NewMockClock())
handler := newHandler(QueueOptions{Store: store, RecordTransformer: recordTransformer})

job, dequeued, err := handler.dequeue(context.Background(), "test_queue", "deadbeef", "test")
job, dequeued, err := handler.dequeue(context.Background(), "deadbeef", "test")
if err != nil {
t.Fatalf("unexpected error dequeueing job: %s", err)
}
@@ -109,7 +84,7 @@ func TestAddExecutionLogEntry(t *testing.T) {
Command: []string{"ls", "-a"},
Out: "<log payload>",
}
if err := handler.addExecutionLogEntry(context.Background(), "test_queue", "deadbeef", job.ID, entry); err != nil {
if err := handler.addExecutionLogEntry(context.Background(), "deadbeef", job.ID, entry); err != nil {
t.Fatalf("unexpected error updating log contents: %s", err)
}

@@ -125,59 +100,39 @@ func TestAddExecutionLogEntry(t *testing.T) {
}
}

func TestAddExecutionLogEntryUnknownQueue(t *testing.T) {
options := Options{}
handler := newHandler(options, glock.NewMockClock())

entry := workerutil.ExecutionLogEntry{
Command: []string{"ls", "-a"},
Out: "<log payload>",
}
if err := handler.addExecutionLogEntry(context.Background(), "test_queue", "deadbjeef", 42, entry); err != ErrUnknownQueue {
t.Fatalf("unexpected error. want=%q have=%q", ErrUnknownQueue, err)
}
}

func TestAddExecutionLogEntryUnknownJob(t *testing.T) {
options := Options{
QueueOptions: map[string]QueueOptions{
"test_queue": {Store: workerstoremocks.NewMockStore()},
},
}
handler := newHandler(options, glock.NewMockClock())
store := workerstoremocks.NewMockStore()
store.AddExecutionLogEntryFunc.SetDefaultReturn(ErrUnknownJob)
handler := newHandler(QueueOptions{Store: store})

entry := workerutil.ExecutionLogEntry{
Command: []string{"ls", "-a"},
Out: "<log payload>",
}
if err := handler.addExecutionLogEntry(context.Background(), "test_queue", "deadbeef", 42, entry); err != ErrUnknownJob {
if err := handler.addExecutionLogEntry(context.Background(), "deadbeef", 42, entry); err != ErrUnknownJob {
t.Fatalf("unexpected error. want=%q have=%q", ErrUnknownJob, err)
}
}

func TestMarkComplete(t *testing.T) {
store := workerstoremocks.NewMockStore()
store.DequeueFunc.SetDefaultReturn(testRecord{ID: 42}, true, nil)
store.MarkCompleteFunc.SetDefaultReturn(true, nil)
recordTransformer := func(ctx context.Context, record workerutil.Record) (apiclient.Job, error) {
return apiclient.Job{ID: 42}, nil
}

options := Options{
QueueOptions: map[string]QueueOptions{
"test_queue": {Store: store, RecordTransformer: recordTransformer},
},
}
handler := newHandler(options, glock.NewMockClock())
handler := newHandler(QueueOptions{Store: store, RecordTransformer: recordTransformer})

job, dequeued, err := handler.dequeue(context.Background(), "test_queue", "deadbeef", "test")
job, dequeued, err := handler.dequeue(context.Background(), "deadbeef", "test")
if err != nil {
t.Fatalf("unexpected error dequeueing job: %s", err)
}
if !dequeued {
t.Fatalf("expected a job to be dequeued")
}

if err := handler.markComplete(context.Background(), "test_queue", "deadbeef", job.ID); err != nil {
if err := handler.markComplete(context.Background(), "deadbeef", job.ID); err != nil {
t.Fatalf("unexpected error completing job: %s", err)
}

@@ -191,50 +146,34 @@ func TestMarkComplete(t *testing.T) {
}

func TestMarkCompleteUnknownJob(t *testing.T) {
options := Options{
QueueOptions: map[string]QueueOptions{
"test_queue": {Store: workerstoremocks.NewMockStore()},
},
}
handler := newHandler(options, glock.NewMockClock())
store := workerstoremocks.NewMockStore()
store.MarkCompleteFunc.SetDefaultReturn(false, nil)
handler := newHandler(QueueOptions{Store: store})

if err := handler.markComplete(context.Background(), "test_queue", "deadbeef", 42); err != ErrUnknownJob {
if err := handler.markComplete(context.Background(), "deadbeef", 42); err != ErrUnknownJob {
t.Fatalf("unexpected error. want=%q have=%q", ErrUnknownJob, err)
}
}

func TestMarkCompleteUnknownQueue(t *testing.T) {
options := Options{}
handler := newHandler(options, glock.NewMockClock())

if err := handler.markComplete(context.Background(), "test_queue", "deadbeef", 42); err != ErrUnknownQueue {
t.Fatalf("unexpected error. want=%q have=%q", ErrUnknownQueue, err)
}
}

func TestMarkErrored(t *testing.T) {
store := workerstoremocks.NewMockStore()
store.DequeueFunc.SetDefaultReturn(testRecord{ID: 42}, true, nil)
store.MarkErroredFunc.SetDefaultReturn(true, nil)
recordTransformer := func(ctx context.Context, record workerutil.Record) (apiclient.Job, error) {
return apiclient.Job{ID: 42}, nil
}

options := Options{
QueueOptions: map[string]QueueOptions{
"test_queue": {Store: store, RecordTransformer: recordTransformer},
},
}
handler := newHandler(options, glock.NewMockClock())
handler := newHandler(QueueOptions{Store: store, RecordTransformer: recordTransformer})

job, dequeued, err := handler.dequeue(context.Background(), "test_queue", "deadbeef", "test")
job, dequeued, err := handler.dequeue(context.Background(), "deadbeef", "test")
if err != nil {
t.Fatalf("unexpected error dequeueing job: %s", err)
}
if !dequeued {
t.Fatalf("expected a job to be dequeued")
}

if err := handler.markErrored(context.Background(), "test_queue", "deadbeef", job.ID, "OH NO"); err != nil {
if err := handler.markErrored(context.Background(), "deadbeef", job.ID, "OH NO"); err != nil {
t.Fatalf("unexpected error completing job: %s", err)
}

@@ -251,50 +190,34 @@ func TestMarkErrored(t *testing.T) {
}

func TestMarkErroredUnknownJob(t *testing.T) {
options := Options{
QueueOptions: map[string]QueueOptions{
"test_queue": {Store: workerstoremocks.NewMockStore()},
},
}
handler := newHandler(options, glock.NewMockClock())
store := workerstoremocks.NewMockStore()
store.MarkErroredFunc.SetDefaultReturn(false, nil)
handler := newHandler(QueueOptions{Store: store})

if err := handler.markErrored(context.Background(), "test_queue", "deadbeef", 42, "OH NO"); err != ErrUnknownJob {
if err := handler.markErrored(context.Background(), "deadbeef", 42, "OH NO"); err != ErrUnknownJob {
t.Fatalf("unexpected error. want=%q have=%q", ErrUnknownJob, err)
}
}

func TestMarkErroredUnknownQueue(t *testing.T) {
options := Options{}
handler := newHandler(options, glock.NewMockClock())

if err := handler.markErrored(context.Background(), "test_queue", "deadbeef", 42, "OH NO"); err != ErrUnknownQueue {
t.Fatalf("unexpected error. want=%q have=%q", ErrUnknownQueue, err)
}
}

func TestMarkFailed(t *testing.T) {
store := workerstoremocks.NewMockStore()
store.DequeueFunc.SetDefaultReturn(testRecord{ID: 42}, true, nil)
store.MarkFailedFunc.SetDefaultReturn(true, nil)
recordTransformer := func(ctx context.Context, record workerutil.Record) (apiclient.Job, error) {
return apiclient.Job{ID: 42}, nil
}

options := Options{
QueueOptions: map[string]QueueOptions{
"test_queue": {Store: store, RecordTransformer: recordTransformer},
},
}
handler := newHandler(options, glock.NewMockClock())
handler := newHandler(QueueOptions{Store: store, RecordTransformer: recordTransformer})

job, dequeued, err := handler.dequeue(context.Background(), "test_queue", "deadbeef", "test")
job, dequeued, err := handler.dequeue(context.Background(), "deadbeef", "test")
if err != nil {
t.Fatalf("unexpected error dequeueing job: %s", err)
}
if !dequeued {
t.Fatalf("expected a job to be dequeued")
}

if err := handler.markFailed(context.Background(), "test_queue", "deadbeef", job.ID, "OH NO"); err != nil {
if err := handler.markFailed(context.Background(), "deadbeef", job.ID, "OH NO"); err != nil {
t.Fatalf("unexpected error completing job: %s", err)
}

@@ -310,6 +233,16 @@ func TestMarkFailed(t *testing.T) {
}
}

func TestMarkFailedUnknownJob(t *testing.T) {
store := workerstoremocks.NewMockStore()
store.MarkFailedFunc.SetDefaultReturn(false, nil)
handler := newHandler(QueueOptions{Store: store})

if err := handler.markFailed(context.Background(), "deadbeef", 42, "OH NO"); err != ErrUnknownJob {
t.Fatalf("unexpected error. want=%q have=%q", ErrUnknownJob, err)
}
}

type testRecord struct {
ID int
Payload string
127 changes: 17 additions & 110 deletions enterprise/cmd/executor-queue/internal/server/lifecycle.go
Original file line number Diff line number Diff line change
@@ -3,123 +3,30 @@ package server
import (
"context"

"github.com/hashicorp/go-multierror"
"github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
)

// heartbeat will release the transaction for any job that is not confirmed to be in-progress
// by the given executor. This method is called when the executor POSTs its in-progress job
// identifiers to the /heartbeat route. This method returns the set of identifiers which the
// executor erroneously claims to hold (and are sent back as a hint to stop processing).
func (h *handler) heartbeat(ctx context.Context, executorName string, jobIDs []int) ([]int, error) {
deadJobs, unknownIDs, err := h.heartbeatJobs(ctx, executorName, jobIDs)
err2 := h.requeueJobs(ctx, deadJobs)
if err != nil && err2 != nil {
err = multierror.Append(err, err2)
// heartbeat is called when the executor POSTs its in-progress job identifiers to the /heartbeat route.
// This method returns the set of identifiers which the executor erroneously claims to hold (and are sent
// back as a hint to stop processing).
// The set of job identifiers assigned to the given executor are getting a heartbeat, indicating they're
// still being worked on.
func (h *handler) heartbeat(ctx context.Context, executorName string, ids []int) (unknownInQueueJobs []int, errs error) {
knownIDs, err := h.Store.Heartbeat(ctx, ids, store.HeartbeatOptions{WorkerHostname: executorName})
if err != nil {
return nil, err
}
if err2 != nil {
err = err2
knownIDsMap := map[int]struct{}{}
for _, id := range knownIDs {
knownIDsMap[id] = struct{}{}
}
return unknownIDs, err
}

// cleanup will release the transactions held by any executor that has not sent a heartbeat
// in a while. This method is called periodically in the background.
func (h *handler) cleanup(ctx context.Context) error {
return h.requeueJobs(ctx, h.pruneExecutors())
}

// heartbeatJobs updates the set of job identifiers assigned to the given executor and returns
// any job that was known to us but not reported by the executor, plus the set of job identifiers
// reported by the executor which do not have an associated record held by this instance of the
// executor queue. This can occur when the executor-queue restarts and loses its in-memory state.
// We send these identifiers back to the executor as a hint to stop processing.
func (h *handler) heartbeatJobs(ctx context.Context, executorName string, ids []int) (dead []jobMeta, unknownIDs []int, errs error) {
now := h.clock.Now()

executorIDsMap := map[int]struct{}{}
unknownInQueueJobs = make([]int, 0)
for _, id := range ids {
executorIDsMap[id] = struct{}{}
}

h.m.Lock()
defer h.m.Unlock()

executor, ok := h.executors[executorName]
if !ok {
executor = &executorMeta{}
h.executors[executorName] = executor
}

executorQueueIDsMap := map[int]struct{}{}
var live []jobMeta
for _, job := range executor.jobs {
executorQueueIDsMap[job.record.RecordID()] = struct{}{}
if _, ok := executorIDsMap[job.record.RecordID()]; ok || now.Sub(job.started) < h.options.UnreportedMaxAge {
live = append(live, job)
if err := h.heartbeatJob(ctx, job); err != nil {
errs = multierror.Append(errs, err)
}
} else {
dead = append(dead, job)
if _, ok := knownIDsMap[id]; !ok {
unknownInQueueJobs = append(unknownInQueueJobs, id)
}
}
executor.jobs = live
executor.lastUpdate = now

unknownIDs = make([]int, 0, len(ids))
for _, id := range ids {
if _, ok := executorQueueIDsMap[id]; !ok {
unknownIDs = append(unknownIDs, id)
}
}

return dead, unknownIDs, errs
}

// pruneExecutors will release the transactions held by any executor that has not sent a
// heartbeat in a while and return the attached jobs.
func (h *handler) pruneExecutors() (jobs []jobMeta) {
h.m.Lock()
defer h.m.Unlock()

for name, executor := range h.executors {
if h.clock.Now().Sub(executor.lastUpdate) <= h.options.DeathThreshold {
continue
}

jobs = append(jobs, executor.jobs...)
delete(h.executors, name)
}

return jobs
}

func (h *handler) heartbeatJob(ctx context.Context, job jobMeta) error {
queueOptions, ok := h.options.QueueOptions[job.queueName]
if !ok {
return ErrUnknownQueue
}

return queueOptions.Store.Heartbeat(ctx, job.record.RecordID())
}

// requeueJobs releases and requeues each of the given jobs.
func (h *handler) requeueJobs(ctx context.Context, jobs []jobMeta) (errs error) {
for _, job := range jobs {
if err := h.requeueJob(ctx, job); err != nil {
errs = multierror.Append(errs, err)
}
}

return errs
}

// requeueJob requeues the given job and releases the associated transaction.
func (h *handler) requeueJob(ctx context.Context, job jobMeta) error {
queueOptions, ok := h.options.QueueOptions[job.queueName]
if !ok {
return ErrUnknownQueue
}

return queueOptions.Store.Requeue(ctx, job.record.RecordID(), h.clock.Now().Add(h.options.RequeueDelay))
return unknownInQueueJobs, errs
}
118 changes: 32 additions & 86 deletions enterprise/cmd/executor-queue/internal/server/lifecycle_test.go
Original file line number Diff line number Diff line change
@@ -3,135 +3,81 @@ package server
import (
"context"
"testing"
"time"

"github.com/derision-test/glock"
"github.com/google/go-cmp/cmp"

apiclient "github.com/sourcegraph/sourcegraph/enterprise/internal/executor"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
"github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store"
workerstoremocks "github.com/sourcegraph/sourcegraph/internal/workerutil/dbworker/store/mocks"
)

func TestHeartbeat(t *testing.T) {
store1 := workerstoremocks.NewMockStore()
store2 := workerstoremocks.NewMockStore()
recordTransformer := func(ctx context.Context, record workerutil.Record) (apiclient.Job, error) {
return apiclient.Job{ID: record.RecordID()}, nil
}

store1.DequeueFunc.PushReturn(testRecord{ID: 41}, true, nil)
store1.DequeueFunc.PushReturn(testRecord{ID: 42}, true, nil)
store2.DequeueFunc.PushReturn(testRecord{ID: 43}, true, nil)
store2.DequeueFunc.PushReturn(testRecord{ID: 44}, true, nil)
store1.DequeueFunc.PushReturn(testRecord{ID: 43}, true, nil)
store1.DequeueFunc.PushReturn(testRecord{ID: 44}, true, nil)
store1.HeartbeatFunc.SetDefaultHook(func(ctx context.Context, ids []int, options store.HeartbeatOptions) ([]int, error) {
knownIDs := make([]int, 0)
for _, id := range ids {
if id >= 41 && id <= 44 {
knownIDs = append(knownIDs, id)
}
}

options := Options{
QueueOptions: map[string]QueueOptions{
"q1": {Store: store1, RecordTransformer: recordTransformer},
"q2": {Store: store2, RecordTransformer: recordTransformer},
},
UnreportedMaxAge: time.Second,
}
clock := glock.NewMockClock()
handler := newHandler(options, clock)
return knownIDs, nil
})

handler := newHandler(QueueOptions{Store: store1, RecordTransformer: recordTransformer})

_, dequeued1, _ := handler.dequeue(context.Background(), "q1", "deadbeef", "test")
_, dequeued2, _ := handler.dequeue(context.Background(), "q1", "deadveal", "test")
_, dequeued3, _ := handler.dequeue(context.Background(), "q2", "deadbeef", "test")
_, dequeued4, _ := handler.dequeue(context.Background(), "q2", "deadveal", "test")
_, dequeued1, _ := handler.dequeue(context.Background(), "deadbeef", "test")
_, dequeued2, _ := handler.dequeue(context.Background(), "deadveal", "test")
_, dequeued3, _ := handler.dequeue(context.Background(), "deadbeef", "test")
_, dequeued4, _ := handler.dequeue(context.Background(), "deadveal", "test")
if !dequeued1 || !dequeued2 || !dequeued3 || !dequeued4 {
t.Fatalf("failed to dequeue records")
}

// missing all jobs, but they're less than UnreportedMaxAge
clock.Advance(time.Second / 2)
if _, err := handler.heartbeat(context.Background(), "deadbeef", []int{}); err != nil {
t.Fatalf("unexpected error performing heartbeat: %s", err)
}
if _, err := handler.heartbeat(context.Background(), "deadveal", []int{}); err != nil {
t.Fatalf("unexpected error performing heartbeat: %s", err)
}

// missing no jobs
clock.Advance(time.Minute * 2)
if _, err := handler.heartbeat(context.Background(), "deadbeef", []int{41, 43}); err != nil {
if ids, err := handler.heartbeat(context.Background(), "deadbeef", []int{41, 43}); err != nil {
t.Fatalf("unexpected error performing heartbeat: %s", err)
} else if diff := cmp.Diff(ids, []int{}); diff != "" {
t.Fatalf("invalid unknownIDs returned diff=%s", diff)
}
if _, err := handler.heartbeat(context.Background(), "deadveal", []int{42, 44}); err != nil {
if ids, err := handler.heartbeat(context.Background(), "deadveal", []int{42, 44}); err != nil {
t.Fatalf("unexpected error performing heartbeat: %s", err)
} else if diff := cmp.Diff(ids, []int{}); diff != "" {
t.Fatalf("invalid unknownIDs returned diff=%s", diff)
}

// missing one deadbeef jobs
clock.Advance(time.Minute * 2)
if _, err := handler.heartbeat(context.Background(), "deadbeef", []int{41}); err != nil {
t.Fatalf("unexpected error performing heartbeat: %s", err)
}
if _, err := handler.heartbeat(context.Background(), "deadveal", []int{42, 44}); err != nil {
if ids, err := handler.heartbeat(context.Background(), "deadbeef", []int{41}); err != nil {
t.Fatalf("unexpected error performing heartbeat: %s", err)
} else if diff := cmp.Diff(ids, []int{}); diff != "" {
t.Fatalf("invalid unknownIDs returned diff=%s", diff)
}

// missing two deadveal jobs
clock.Advance(time.Minute * 2)
if _, err := handler.heartbeat(context.Background(), "deadbeef", []int{41}); err != nil {
t.Fatalf("unexpected error performing heartbeat: %s", err)
}
if _, err := handler.heartbeat(context.Background(), "deadveal", []int{}); err != nil {
if ids, err := handler.heartbeat(context.Background(), "deadbeef", []int{}); err != nil {
t.Fatalf("unexpected error performing heartbeat: %s", err)
} else if diff := cmp.Diff(ids, []int{}); diff != "" {
t.Fatalf("invalid unknownIDs returned diff=%s", diff)
}

// unknown jobs
clock.Advance(time.Minute * 2)
if unknownIDs, err := handler.heartbeat(context.Background(), "deadbeef", []int{41, 43, 45}); err != nil {
t.Fatalf("unexpected error performing heartbeat: %s", err)
} else if diff := cmp.Diff([]int{43, 45}, unknownIDs); diff != "" {
} else if diff := cmp.Diff([]int{45}, unknownIDs); diff != "" {
t.Errorf("unexpected unknown ids (-want +got):\n%s", diff)
}
if unknownIDs, err := handler.heartbeat(context.Background(), "deadveal", []int{42, 44, 45}); err != nil {
t.Fatalf("unexpected error performing heartbeat: %s", err)
} else if diff := cmp.Diff([]int{42, 44, 45}, unknownIDs); diff != "" {
} else if diff := cmp.Diff([]int{45}, unknownIDs); diff != "" {
t.Errorf("unexpected unknown ids (-want +got):\n%s", diff)
}
}

func TestCleanup(t *testing.T) {
store1 := workerstoremocks.NewMockStore()
store2 := workerstoremocks.NewMockStore()
recordTransformer := func(ctx context.Context, record workerutil.Record) (apiclient.Job, error) {
return apiclient.Job{ID: record.RecordID()}, nil
}

store1.DequeueFunc.PushReturn(testRecord{ID: 41}, true, nil)
store1.DequeueFunc.PushReturn(testRecord{ID: 42}, true, nil)
store2.DequeueFunc.PushReturn(testRecord{ID: 43}, true, nil)
store2.DequeueFunc.PushReturn(testRecord{ID: 44}, true, nil)

options := Options{
QueueOptions: map[string]QueueOptions{
"q1": {Store: store1, RecordTransformer: recordTransformer},
"q2": {Store: store2, RecordTransformer: recordTransformer},
},
DeathThreshold: time.Minute * 5,
}
clock := glock.NewMockClock()
handler := newHandler(options, clock)

_, dequeued1, _ := handler.dequeue(context.Background(), "q1", "deadbeef", "test")
_, dequeued2, _ := handler.dequeue(context.Background(), "q1", "deadveal", "test")
_, dequeued3, _ := handler.dequeue(context.Background(), "q2", "deadbeef", "test")
_, dequeued4, _ := handler.dequeue(context.Background(), "q2", "deadveal", "test")
if !dequeued1 || !dequeued2 || !dequeued3 || !dequeued4 {
t.Fatalf("failed to dequeue records")
}

for i := 0; i < 6; i++ {
clock.Advance(time.Minute)

if _, err := handler.heartbeat(context.Background(), "deadbeef", []int{41, 43}); err != nil {
t.Fatalf("unexpected error performing heartbeat: %s", err)
}
}

if err := handler.cleanup(context.Background()); err != nil {
t.Fatalf("unexpected error performing cleanup: %s", err)
}
}
38 changes: 0 additions & 38 deletions enterprise/cmd/executor-queue/internal/server/observability.go

This file was deleted.

48 changes: 24 additions & 24 deletions enterprise/cmd/executor-queue/internal/server/routes.go
Original file line number Diff line number Diff line change
@@ -7,40 +7,40 @@ import (
"io"
"net/http"
"regexp"
"strings"

"github.com/gorilla/mux"
"github.com/inconshreveable/log15"

apiclient "github.com/sourcegraph/sourcegraph/enterprise/internal/executor"
)

func (h *handler) setupRoutes(router *mux.Router) {
var names []string
for queueName := range h.options.QueueOptions {
names = append(names, regexp.QuoteMeta(queueName))
}

routes := map[string]func(w http.ResponseWriter, r *http.Request){
"dequeue": h.handleDequeue,
"addExecutionLogEntry": h.handleAddExecutionLogEntry,
"markComplete": h.handleMarkComplete,
"markErrored": h.handleMarkErrored,
"markFailed": h.handleMarkFailed,
}
for path, handler := range routes {
router.Path(fmt.Sprintf("/{queueName:(?:%s)}/%s", strings.Join(names, "|"), path)).Methods("POST").HandlerFunc(handler)
func setupRoutes(queueOptionsMap map[string]QueueOptions) func(router *mux.Router) {
return func(router *mux.Router) {
for name, queueOptions := range queueOptionsMap {
h := newHandler(queueOptions)

subRouter := router.PathPrefix(fmt.Sprintf("/{queueName:(?:%s)}/", regexp.QuoteMeta(name))).Subrouter()
routes := map[string]func(w http.ResponseWriter, r *http.Request){
"dequeue": h.handleDequeue,
"addExecutionLogEntry": h.handleAddExecutionLogEntry,
"markComplete": h.handleMarkComplete,
"markErrored": h.handleMarkErrored,
"markFailed": h.handleMarkFailed,
"heartbeat": h.handleHeartbeat,
}
for path, handler := range routes {
subRouter.Path(fmt.Sprintf("/%s", path)).Methods("POST").HandlerFunc(handler)
}
}
}

router.Path("/heartbeat").Methods("POST").HandlerFunc(h.handleHeartbeat)
}

// POST /{queueName}/dequeue
func (h *handler) handleDequeue(w http.ResponseWriter, r *http.Request) {
var payload apiclient.DequeueRequest

h.wrapHandler(w, r, &payload, func() (int, interface{}, error) {
job, dequeued, err := h.dequeue(r.Context(), mux.Vars(r)["queueName"], payload.ExecutorName, payload.ExecutorHostname)
job, dequeued, err := h.dequeue(r.Context(), payload.ExecutorName, payload.ExecutorHostname)
if !dequeued {
return http.StatusNoContent, nil, err
}
@@ -54,7 +54,7 @@ func (h *handler) handleAddExecutionLogEntry(w http.ResponseWriter, r *http.Requ
var payload apiclient.AddExecutionLogEntryRequest

h.wrapHandler(w, r, &payload, func() (int, interface{}, error) {
err := h.addExecutionLogEntry(r.Context(), mux.Vars(r)["queueName"], payload.ExecutorName, payload.JobID, payload.ExecutionLogEntry)
err := h.addExecutionLogEntry(r.Context(), payload.ExecutorName, payload.JobID, payload.ExecutionLogEntry)
return http.StatusNoContent, nil, err
})
}
@@ -64,7 +64,7 @@ func (h *handler) handleMarkComplete(w http.ResponseWriter, r *http.Request) {
var payload apiclient.MarkCompleteRequest

h.wrapHandler(w, r, &payload, func() (int, interface{}, error) {
err := h.markComplete(r.Context(), mux.Vars(r)["queueName"], payload.ExecutorName, payload.JobID)
err := h.markComplete(r.Context(), payload.ExecutorName, payload.JobID)
if err == ErrUnknownJob {
return http.StatusNotFound, nil, nil
}
@@ -78,7 +78,7 @@ func (h *handler) handleMarkErrored(w http.ResponseWriter, r *http.Request) {
var payload apiclient.MarkErroredRequest

h.wrapHandler(w, r, &payload, func() (int, interface{}, error) {
err := h.markErrored(r.Context(), mux.Vars(r)["queueName"], payload.ExecutorName, payload.JobID, payload.ErrorMessage)
err := h.markErrored(r.Context(), payload.ExecutorName, payload.JobID, payload.ErrorMessage)
if err == ErrUnknownJob {
return http.StatusNotFound, nil, nil
}
@@ -92,7 +92,7 @@ func (h *handler) handleMarkFailed(w http.ResponseWriter, r *http.Request) {
var payload apiclient.MarkErroredRequest

h.wrapHandler(w, r, &payload, func() (int, interface{}, error) {
err := h.markFailed(r.Context(), mux.Vars(r)["queueName"], payload.ExecutorName, payload.JobID, payload.ErrorMessage)
err := h.markFailed(r.Context(), payload.ExecutorName, payload.JobID, payload.ErrorMessage)
if err == ErrUnknownJob {
return http.StatusNotFound, nil, nil
}
@@ -101,7 +101,7 @@ func (h *handler) handleMarkFailed(w http.ResponseWriter, r *http.Request) {
})
}

// POST /heartbeat
// POST /{queueName}/heartbeat
func (h *handler) handleHeartbeat(w http.ResponseWriter, r *http.Request) {
var payload apiclient.HeartbeatRequest

27 changes: 10 additions & 17 deletions enterprise/cmd/executor-queue/internal/server/server.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,25 @@
package server

import (
"context"
"fmt"
"net/http"

"github.com/derision-test/glock"
"github.com/inconshreveable/log15"

"github.com/sourcegraph/sourcegraph/internal/goroutine"
"github.com/sourcegraph/sourcegraph/internal/httpserver"
"github.com/sourcegraph/sourcegraph/internal/observation"
"github.com/sourcegraph/sourcegraph/internal/trace/ot"
)

// ServerOptions captures the options required for setting up an executor queue
// server.
type ServerOptions struct {
Port int
}

// NewServer returns an HTTP job queue server.
func NewServer(options Options, observationContext *observation.Context) goroutine.BackgroundRoutine {
func NewServer(options ServerOptions, queueOptions map[string]QueueOptions) goroutine.BackgroundRoutine {
addr := fmt.Sprintf(":%d", options.Port)
handler := newHandlerWithMetrics(options, glock.NewRealClock(), observationContext)
httpHandler := ot.Middleware(httpserver.NewHandler(handler.setupRoutes))
router := setupRoutes(queueOptions)
httpHandler := ot.Middleware(httpserver.NewHandler(router))
server := httpserver.NewFromAddr(addr, &http.Server{Handler: httpHandler})
janitor := goroutine.NewPeriodicGoroutine(context.Background(), options.CleanupInterval, &handlerWrapper{handler})
return goroutine.CombinedRoutine{server, janitor}
return server
}

type handlerWrapper struct{ handler *handler }

var _ goroutine.Handler = &handlerWrapper{}

func (hw *handlerWrapper) Handle(ctx context.Context) error { return hw.handler.cleanup(ctx) }
func (hw *handlerWrapper) HandleError(err error) { log15.Error("Failed to requeue jobs", "err", err) }
2 changes: 1 addition & 1 deletion enterprise/cmd/executor-queue/main.go
Original file line number Diff line number Diff line change
@@ -97,7 +97,7 @@ func main() {
}))
}

server := apiserver.NewServer(serviceConfig.ServerOptions(queueOptions), observationContext)
server := apiserver.NewServer(serviceConfig.ServerOptions(), queueOptions)
goroutine.MonitorBackgroundRoutines(context.Background(), server)
}

7 changes: 6 additions & 1 deletion enterprise/cmd/executor/config.go
Original file line number Diff line number Diff line change
@@ -10,6 +10,7 @@ import (
"github.com/sourcegraph/sourcegraph/enterprise/cmd/executor/internal/command"
apiworker "github.com/sourcegraph/sourcegraph/enterprise/cmd/executor/internal/worker"
"github.com/sourcegraph/sourcegraph/internal/env"
"github.com/sourcegraph/sourcegraph/internal/hostname"
"github.com/sourcegraph/sourcegraph/internal/workerutil"
)

@@ -97,8 +98,12 @@ func (c *Config) ResourceOptions() command.ResourceOptions {
}

func (c *Config) ClientOptions(transport http.RoundTripper) apiclient.Options {
hn := hostname.Get()

return apiclient.Options{
ExecutorName: uuid.New().String(),
// Be unique but also descriptive.
ExecutorName: hn + "-" + uuid.New().String(),
ExecutorHostname: hn,
PathPrefix: "/.executors/queue",
EndpointOptions: c.EndpointOptions(),
BaseClientOptions: c.BaseClientOptions(transport),
9 changes: 5 additions & 4 deletions enterprise/cmd/executor/internal/apiclient/client.go
Original file line number Diff line number Diff line change
@@ -151,8 +151,8 @@ func (c *Client) MarkFailed(ctx context.Context, queueName string, jobID int, er
return c.client.DoAndDrop(ctx, req)
}

func (c *Client) Ping(ctx context.Context, jobIDs []int) (err error) {
req, err := c.makeRequest("POST", "heartbeat", executor.HeartbeatRequest{
func (c *Client) Ping(ctx context.Context, queueName string, jobIDs []int) (err error) {
req, err := c.makeRequest("POST", fmt.Sprintf("%s/heartbeat", queueName), executor.HeartbeatRequest{
ExecutorName: c.options.ExecutorName,
})
if err != nil {
@@ -162,13 +162,14 @@ func (c *Client) Ping(ctx context.Context, jobIDs []int) (err error) {
return c.client.DoAndDrop(ctx, req)
}

func (c *Client) Heartbeat(ctx context.Context, jobIDs []int) (unknownIDs []int, err error) {
func (c *Client) Heartbeat(ctx context.Context, queueName string, jobIDs []int) (unknownIDs []int, err error) {
ctx, endObservation := c.operations.heartbeat.With(ctx, &err, observation.Args{LogFields: []log.Field{
log.String("queueName", queueName),
log.String("jobIDs", intsToString(jobIDs)),
}})
defer endObservation(1, observation.Args{})

req, err := c.makeRequest("POST", "heartbeat", executor.HeartbeatRequest{
req, err := c.makeRequest("POST", fmt.Sprintf("%s/heartbeat", queueName), executor.HeartbeatRequest{
ExecutorName: c.options.ExecutorName,
JobIDs: jobIDs,
})
8 changes: 4 additions & 4 deletions enterprise/cmd/executor/internal/apiclient/client_test.go
Original file line number Diff line number Diff line change
@@ -247,7 +247,7 @@ func TestMarkFailed(t *testing.T) {
func TestHeartbeat(t *testing.T) {
spec := routeSpec{
expectedMethod: "POST",
expectedPath: "/.executors/queue/heartbeat",
expectedPath: "/.executors/queue/test_queue/heartbeat",
expectedUsername: "test",
expectedPassword: "hunter2",
expectedPayload: `{"executorName": "deadbeef", "jobIds": [1, 2, 3]}`,
@@ -256,7 +256,7 @@ func TestHeartbeat(t *testing.T) {
}

testRoute(t, spec, func(client *Client) {
unknownIDs, err := client.Heartbeat(context.Background(), []int{1, 2, 3})
unknownIDs, err := client.Heartbeat(context.Background(), "test_queue", []int{1, 2, 3})
if err != nil {
t.Fatalf("unexpected error performing heartbeat: %s", err)
}
@@ -270,7 +270,7 @@ func TestHeartbeat(t *testing.T) {
func TestHeartbeatBadResponse(t *testing.T) {
spec := routeSpec{
expectedMethod: "POST",
expectedPath: "/.executors/queue/heartbeat",
expectedPath: "/.executors/queue/test_queue/heartbeat",
expectedUsername: "test",
expectedPassword: "hunter2",
expectedPayload: `{"executorName": "deadbeef", "jobIds": [1, 2, 3]}`,
@@ -279,7 +279,7 @@ func TestHeartbeatBadResponse(t *testing.T) {
}

testRoute(t, spec, func(client *Client) {
if _, err := client.Heartbeat(context.Background(), []int{1, 2, 3}); err == nil {
if _, err := client.Heartbeat(context.Background(), "test_queue", []int{1, 2, 3}); err == nil {
t.Fatalf("expected an error")
}
})
43 changes: 23 additions & 20 deletions enterprise/cmd/executor/internal/worker/mock_store_test.go
4 changes: 2 additions & 2 deletions enterprise/cmd/executor/internal/worker/store.go
Original file line number Diff line number Diff line change
@@ -38,9 +38,9 @@ func (s *storeShim) Dequeue(ctx context.Context, workerHostname string, extraArg
return job, dequeued, nil
}

func (s *storeShim) Heartbeat(ctx context.Context, id int) error {
func (s *storeShim) Heartbeat(ctx context.Context, ids []int) ([]int, error) {
// Not needed, we do bulk updates from the executor.
return nil
return nil, nil
}

func (s *storeShim) AddExecutionLogEntry(ctx context.Context, id int, entry workerutil.ExecutionLogEntry) error {
8 changes: 4 additions & 4 deletions enterprise/cmd/executor/internal/worker/worker.go
Original file line number Diff line number Diff line change
@@ -75,9 +75,9 @@ func NewWorker(options Options, observationContext *observation.Context) gorouti
runnerFactory: command.NewRunner,
}

indexer := workerutil.NewWorker(context.Background(), store, handler, options.WorkerOptions)
worker := workerutil.NewWorker(context.Background(), store, handler, options.WorkerOptions)
heartbeat := goroutine.NewHandlerWithErrorMessage("heartbeat", func(ctx context.Context) error {
unknownIDs, err := queueStore.Heartbeat(ctx, idSet.Slice())
unknownIDs, err := queueStore.Heartbeat(ctx, options.QueueName, idSet.Slice())
if err != nil {
return err
}
@@ -90,7 +90,7 @@ func NewWorker(options Options, observationContext *observation.Context) gorouti
})

return goroutine.CombinedRoutine{
indexer,
worker,
goroutine.NewPeriodicGoroutine(context.Background(), options.HeartbeatInterval, heartbeat),
}
}
@@ -111,7 +111,7 @@ func connectToFrontend(queueStore *apiclient.Client, options Options) bool {
defer signal.Stop(signals)

for {
err := queueStore.Ping(context.Background(), nil)
err := queueStore.Ping(context.Background(), options.QueueName, nil)
if err == nil {
log15.Info("Connected to Sourcegraph instance")
return true
Original file line number Diff line number Diff line change
@@ -63,7 +63,7 @@ func newInternalProxyHandler(uploadHandler http.Handler) (func() http.Handler, e
base.Path("/git/{rest:.*/(?:info/refs|git-upload-pack)}").Handler(reverseProxy(frontendOrigin))

// Proxy only the known routes in the executor queue API
base.Path("/queue/{rest:heartbeat|.*/(?:dequeue|addExecutionLogEntry|markComplete|markErrored|markFailed)}").Handler(reverseProxy(queueOrigin))
base.Path("/queue/{rest:.*/(?:dequeue|addExecutionLogEntry|markComplete|markErrored|markFailed|heartbeat)}").Handler(reverseProxy(queueOrigin))

// Upload LSIF indexes without a sudo access token or github tokens
base.Path("/lsif/upload").Methods("POST").Handler(uploadHandler)

Large diffs are not rendered by default.

8 changes: 4 additions & 4 deletions enterprise/internal/batches/background/executor_store.go
Original file line number Diff line number Diff line change
@@ -64,20 +64,20 @@ type executorStore struct {
const markCompleteQuery = `
UPDATE batch_spec_executions
SET state = 'completed', finished_at = clock_timestamp(), batch_spec_id = (SELECT id FROM batch_specs WHERE rand_id = %s)
WHERE id = %s AND state = 'processing'
WHERE id = %s AND state = 'processing' AND worker_hostname = %s
RETURNING id
`

func (s *executorStore) MarkComplete(ctx context.Context, id int) (_ bool, err error) {
func (s *executorStore) MarkComplete(ctx context.Context, id int, options dbworkerstore.MarkFinalOptions) (_ bool, err error) {
batchesStore := store.New(s.Store.Handle().DB(), nil)

batchSpecRandID, err := loadAndExtractBatchSpecRandID(ctx, batchesStore, int64(id))
if err != nil {
// If we couldn't extract the batch spec rand id, we mark the job as failed
return s.Store.MarkFailed(ctx, id, fmt.Sprintf("failed to extract batch spec ID: %s", err))
return s.Store.MarkFailed(ctx, id, fmt.Sprintf("failed to extract batch spec ID: %s", err), options)
}

_, ok, err := basestore.ScanFirstInt(batchesStore.Query(ctx, sqlf.Sprintf(markCompleteQuery, batchSpecRandID, id)))
_, ok, err := basestore.ScanFirstInt(batchesStore.Query(ctx, sqlf.Sprintf(markCompleteQuery, batchSpecRandID, id, options.WorkerHostname)))
return ok, err
}

Original file line number Diff line number Diff line change
@@ -56,7 +56,7 @@ stdout: {"operation":"CREATING_BATCH_SPEC","timestamp":"2021-07-06T09:38:51.535Z
}

for _, e := range entries {
err := workStore.AddExecutionLogEntry(context.Background(), int(specExec.ID), e)
err := workStore.AddExecutionLogEntry(context.Background(), int(specExec.ID), e, dbworkerstore.AddExecutionLogEntryOptions{})
if err != nil {
t.Fatal(err)
}
2 changes: 2 additions & 0 deletions internal/database/schema.md
Original file line number Diff line number Diff line change
@@ -318,6 +318,7 @@ Referenced by:
trigger_event | integer | | |
worker_hostname | text | | not null | ''::text
last_heartbeat_at | timestamp with time zone | | |
execution_logs | json[] | | |
Indexes:
"cm_action_jobs_pkey" PRIMARY KEY, btree (id)
Foreign-key constraints:
@@ -437,6 +438,7 @@ Foreign-key constraints:
num_results | integer | | |
worker_hostname | text | | not null | ''::text
last_heartbeat_at | timestamp with time zone | | |
execution_logs | json[] | | |
Indexes:
"cm_trigger_jobs_pkey" PRIMARY KEY, btree (id)
Foreign-key constraints:
15 changes: 12 additions & 3 deletions internal/workerutil/dbworker/store/helpers_test.go
Original file line number Diff line number Diff line change
@@ -7,6 +7,7 @@ import (

"github.com/derision-test/glock"
"github.com/keegancsmith/sqlf"
"github.com/lib/pq"

"github.com/sourcegraph/sourcegraph/internal/database/basestore"
"github.com/sourcegraph/sourcegraph/internal/database/dbtesting"
@@ -20,8 +21,9 @@ func testStore(db dbutil.DB, options Options) *store {
}

type TestRecord struct {
ID int
State string
ID int
State string
ExecutionLogs []ExecutionLogEntry
}

func (v TestRecord) RecordID() int {
@@ -36,7 +38,7 @@ func testScanFirstRecord(rows *sql.Rows, queryErr error) (v workerutil.Record, _

if rows.Next() {
var record TestRecord
if err := rows.Scan(&record.ID, &record.State); err != nil {
if err := rows.Scan(&record.ID, &record.State, pq.Array(&record.ExecutionLogs)); err != nil {
return nil, false, err
}

@@ -146,6 +148,7 @@ func defaultTestStoreOptions(clock glock.Clock) Options {
ColumnExpressions: []*sqlf.Query{
sqlf.Sprintf("w.id"),
sqlf.Sprintf("w.state"),
sqlf.Sprintf("w.execution_logs"),
},
StalledMaxAge: time.Second * 5,
MaxNumResets: 5,
@@ -170,6 +173,12 @@ func assertDequeueRecordResult(t *testing.T, expectedID int, record interface{},
}
}

func assertDequeueRecordResultLogCount(t *testing.T, expectedLogCount int, record interface{}) {
if val := len(record.(TestRecord).ExecutionLogs); val != expectedLogCount {
t.Errorf("unexpected count of logs. want=%d have=%d", expectedLogCount, val)
}
}

func assertDequeueRecordViewResult(t *testing.T, expectedID, expectedNewField int, record interface{}, ok bool, err error) {
if err != nil {
t.Fatalf("unexpected error: %s", err)
156 changes: 87 additions & 69 deletions internal/workerutil/dbworker/store/mocks/mock_store.go

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions internal/workerutil/dbworker/store/observability.go
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@ type operations struct {
markErrored *observation.Operation
markFailed *observation.Operation
resetStalled *observation.Operation
heartbeat *observation.Operation
}

func newOperations(storeName string, observationContext *observation.Context) *operations {
@@ -43,5 +44,6 @@ func newOperations(storeName string, observationContext *observation.Context) *o
markErrored: op("MarkErrored"),
markFailed: op("MarkFailed"),
resetStalled: op("ResetStalled"),
heartbeat: op("Heartbeat"),
}
}
152 changes: 125 additions & 27 deletions internal/workerutil/dbworker/store/store.go
Original file line number Diff line number Diff line change
@@ -19,6 +19,50 @@ import (
"github.com/sourcegraph/sourcegraph/internal/workerutil"
)

type HeartbeatOptions struct {
// WorkerHostname, if set, enforces worker_hostname to be set to a specific value.
WorkerHostname string
}

func (o *HeartbeatOptions) ToSQLConds(formatQuery func(query string, args ...interface{}) *sqlf.Query) []*sqlf.Query {
conds := []*sqlf.Query{}
if o.WorkerHostname != "" {
conds = append(conds, formatQuery("{worker_hostname} = %s", o.WorkerHostname))
}
return conds
}

type AddExecutionLogEntryOptions struct {
// WorkerHostname, if set, enforces worker_hostname to be set to a specific value.
WorkerHostname string
// State, if set, enforces state to be set to a specific value.
State string
}

func (o *AddExecutionLogEntryOptions) ToSQLConds(formatQuery func(query string, args ...interface{}) *sqlf.Query) []*sqlf.Query {
conds := []*sqlf.Query{}
if o.WorkerHostname != "" {
conds = append(conds, formatQuery("{worker_hostname} = %s", o.WorkerHostname))
}
if o.State != "" {
conds = append(conds, formatQuery("{state} = %s", o.State))
}
return conds
}

type MarkFinalOptions struct {
// WorkerHostname, if set, enforces worker_hostname to be set to a specific value.
WorkerHostname string
}

func (o *MarkFinalOptions) ToSQLConds(formatQuery func(query string, args ...interface{}) *sqlf.Query) []*sqlf.Query {
conds := []*sqlf.Query{}
if o.WorkerHostname != "" {
conds = append(conds, formatQuery("{worker_hostname} = %s", o.WorkerHostname))
}
return conds
}

// Store is the persistence layer for the dbworker package that handles worker-side operations backed by a Postgres
// database. See Options for details on the required shape of the database tables (e.g. table column names/types).
type Store interface {
@@ -35,29 +79,29 @@ type Store interface {
Dequeue(ctx context.Context, workerHostname string, conditions []*sqlf.Query) (workerutil.Record, bool, error)

// Heartbeat marks the given record as currently being processed.
Heartbeat(ctx context.Context, id int) error
Heartbeat(ctx context.Context, ids []int, options HeartbeatOptions) (knownIDs []int, err error)

// Requeue updates the state of the record with the given identifier to queued and adds a processing delay before
// the next dequeue of this record can be performed.
Requeue(ctx context.Context, id int, after time.Time) error

// AddExecutionLogEntry adds an executor log entry to the record.
AddExecutionLogEntry(ctx context.Context, id int, entry workerutil.ExecutionLogEntry) error
AddExecutionLogEntry(ctx context.Context, id int, entry workerutil.ExecutionLogEntry, options AddExecutionLogEntryOptions) error

// MarkComplete attempts to update the state of the record to complete. If this record has already been moved from
// the processing state to a terminal state, this method will have no effect. This method returns a boolean flag
// indicating if the record was updated.
MarkComplete(ctx context.Context, id int) (bool, error)
MarkComplete(ctx context.Context, id int, options MarkFinalOptions) (bool, error)

// MarkErrored attempts to update the state of the record to errored. This method will only have an effect
// if the current state of the record is processing or completed. A requeued record or a record already marked
// with an error will not be updated. This method returns a boolean flag indicating if the record was updated.
MarkErrored(ctx context.Context, id int, failureMessage string) (bool, error)
MarkErrored(ctx context.Context, id int, failureMessage string, options MarkFinalOptions) (bool, error)

// MarkFailed attempts to update the state of the record to failed. This method will only have an effect
// if the current state of the record is processing or completed. A requeued record or a record already marked
// with an error will not be updated. This method returns a boolean flag indicating if the record was updated.
MarkFailed(ctx context.Context, id int, failureMessage string) (bool, error)
MarkFailed(ctx context.Context, id int, failureMessage string, options MarkFinalOptions) (bool, error)

// ResetStalled moves all processing records that have not received a heartbeat within `StalledMaxAge` back to the
// queued state. In order to prevent input that continually crashes worker instances, records that have been reset
@@ -379,6 +423,7 @@ SET
{last_heartbeat_at} = %s,
{finished_at} = NULL,
{failure_message} = NULL,
{execution_logs} = NULL,
{worker_hostname} = %s
WHERE {id} IN (SELECT {id} FROM candidate)
RETURNING {id}
@@ -389,21 +434,51 @@ const selectRecordQuery = `
SELECT %s FROM %s WHERE {id} = %s
`

func (s *store) Heartbeat(ctx context.Context, id int) error {
err := s.Exec(ctx, s.formatQuery(updateCandidateQuery, quote(s.options.TableName), s.now(), id))
if err != nil {
if err != ctx.Err() {
return err
}
func (s *store) Heartbeat(ctx context.Context, ids []int, options HeartbeatOptions) (knownIDs []int, err error) {
ctx, endObservation := s.operations.heartbeat.With(ctx, &err, observation.Args{})
defer endObservation(1, observation.Args{})

if len(ids) == 0 {
return []int{}, nil
}

sqlIDs := make([]*sqlf.Query, 0, len(ids))
for _, id := range ids {
sqlIDs = append(sqlIDs, sqlf.Sprintf("%s", id))
}

quotedTableName := quote(s.options.TableName)

conds := []*sqlf.Query{
s.formatQuery("{id} in (%s)", sqlf.Join(sqlIDs, "")),
s.formatQuery("{state} = 'processing'"),
}
return nil
conds = append(conds, options.ToSQLConds(s.formatQuery)...)

knownIDs, err = basestore.ScanInts(s.Query(ctx, s.formatQuery(updateCandidateQuery, quotedTableName, sqlf.Join(conds, "AND"), quotedTableName, s.now())))
return knownIDs, err
}

const updateCandidateQuery = `
-- source: internal/workerutil/store.go:Heartbeat
UPDATE %s
SET {last_heartbeat_at} = %s
WHERE {id} = %s AND {state} = 'processing'
WITH alive_candidates AS (
SELECT
id
FROM
%s
WHERE
%s
ORDER BY
id ASC
FOR UPDATE
)
UPDATE
%s
SET
{last_heartbeat_at} = %s
WHERE
id IN (SELECT id FROM alive_candidates)
RETURNING id
`

// Requeue updates the state of the record with the given identifier to queued and adds a processing delay before
@@ -431,58 +506,75 @@ WHERE {id} = %s
`

// AddExecutionLogEntry adds an executor log entry to the record.
func (s *store) AddExecutionLogEntry(ctx context.Context, id int, entry workerutil.ExecutionLogEntry) (err error) {
func (s *store) AddExecutionLogEntry(ctx context.Context, id int, entry workerutil.ExecutionLogEntry, options AddExecutionLogEntryOptions) (err error) {
ctx, endObservation := s.operations.addExecutionLogEntry.With(ctx, &err, observation.Args{LogFields: []log.Field{
log.Int("id", id),
}})
defer endObservation(1, observation.Args{})

conds := []*sqlf.Query{
s.formatQuery("{id} = %s", id),
}
conds = append(conds, options.ToSQLConds(s.formatQuery)...)

return s.Exec(ctx, s.formatQuery(
addExecutionLogEntryQuery,
quote(s.options.TableName),
ExecutionLogEntry(entry),
id,
sqlf.Join(conds, "AND"),
))
}

const addExecutionLogEntryQuery = `
-- source: internal/workerutil/store.go:AddExecutionLogEntry
UPDATE %s
SET {execution_logs} = {execution_logs} || %s::json
WHERE {id} = %s
WHERE %s
`

// MarkComplete attempts to update the state of the record to complete. If this record has already been moved from
// the processing state to a terminal state, this method will have no effect. This method returns a boolean flag
// indicating if the record was updated.
func (s *store) MarkComplete(ctx context.Context, id int) (_ bool, err error) {
func (s *store) MarkComplete(ctx context.Context, id int, options MarkFinalOptions) (_ bool, err error) {
ctx, endObservation := s.operations.markComplete.With(ctx, &err, observation.Args{LogFields: []log.Field{
log.Int("id", id),
}})
defer endObservation(1, observation.Args{})

_, ok, err := basestore.ScanFirstInt(s.Query(ctx, s.formatQuery(markCompleteQuery, quote(s.options.TableName), id)))
conds := []*sqlf.Query{
s.formatQuery("{id} = %s", id),
s.formatQuery("{state} = 'processing'"),
}
conds = append(conds, options.ToSQLConds(s.formatQuery)...)

_, ok, err := basestore.ScanFirstInt(s.Query(ctx, s.formatQuery(markCompleteQuery, quote(s.options.TableName), sqlf.Join(conds, "AND"))))
return ok, err
}

const markCompleteQuery = `
-- source: internal/workerutil/store.go:MarkComplete
UPDATE %s
SET {state} = 'completed', {finished_at} = clock_timestamp()
WHERE {id} = %s AND {state} = 'processing'
WHERE %s
RETURNING {id}
`

// MarkErrored attempts to update the state of the record to errored. This method will only have an effect
// if the current state of the record is processing or completed. A requeued record or a record already marked
// with an error will not be updated. This method returns a boolean flag indicating if the record was updated.
func (s *store) MarkErrored(ctx context.Context, id int, failureMessage string) (_ bool, err error) {
func (s *store) MarkErrored(ctx context.Context, id int, failureMessage string, options MarkFinalOptions) (_ bool, err error) {
ctx, endObservation := s.operations.markErrored.With(ctx, &err, observation.Args{LogFields: []log.Field{
log.Int("id", id),
}})
defer endObservation(1, observation.Args{})

q := s.formatQuery(markErroredQuery, quote(s.options.TableName), s.options.MaxNumRetries, failureMessage, id)
conds := []*sqlf.Query{
s.formatQuery("{id} = %s", id),
s.formatQuery("{state} = 'processing'"),
}
conds = append(conds, options.ToSQLConds(s.formatQuery)...)

q := s.formatQuery(markErroredQuery, quote(s.options.TableName), s.options.MaxNumRetries, failureMessage, sqlf.Join(conds, "AND"))
_, ok, err := basestore.ScanFirstInt(s.Query(ctx, q))
return ok, err
}
@@ -494,20 +586,26 @@ SET {state} = CASE WHEN {num_failures} + 1 = %d THEN 'failed' ELSE 'errored' END
{finished_at} = clock_timestamp(),
{failure_message} = %s,
{num_failures} = {num_failures} + 1
WHERE {id} = %s AND ({state} = 'processing' OR {state} = 'completed')
WHERE %s
RETURNING {id}
`

// MarkFailed attempts to update the state of the record to failed. This method will only have an effect
// if the current state of the record is processing or completed. A requeued record or a record already marked
// with an error will not be updated. This method returns a boolean flag indicating if the record was updated.
func (s *store) MarkFailed(ctx context.Context, id int, failureMessage string) (_ bool, err error) {
func (s *store) MarkFailed(ctx context.Context, id int, failureMessage string, options MarkFinalOptions) (_ bool, err error) {
ctx, endObservation := s.operations.markFailed.With(ctx, &err, observation.Args{LogFields: []log.Field{
log.Int("id", id),
}})
defer endObservation(1, observation.Args{})

q := s.formatQuery(markFailedQuery, quote(s.options.TableName), failureMessage, id)
conds := []*sqlf.Query{
s.formatQuery("{id} = %s", id),
s.formatQuery("{state} = 'processing'"),
}
conds = append(conds, options.ToSQLConds(s.formatQuery)...)

q := s.formatQuery(markFailedQuery, quote(s.options.TableName), failureMessage, sqlf.Join(conds, "AND"))
_, ok, err := basestore.ScanFirstInt(s.Query(ctx, q))
return ok, err
}
@@ -519,7 +617,7 @@ SET {state} = 'failed',
{finished_at} = clock_timestamp(),
{failure_message} = %s,
{num_failures} = {num_failures} + 1
WHERE {id} = %s AND ({state} = 'processing' OR {state} = 'completed')
WHERE %s
RETURNING {id}
`

48 changes: 32 additions & 16 deletions internal/workerutil/dbworker/store/store_test.go
Original file line number Diff line number Diff line change
@@ -149,6 +149,22 @@ func TestStoreDequeueConditions(t *testing.T) {
assertDequeueRecordResult(t, 3, record, ok, err)
}

func TestStoreDequeueResetExecutionLogs(t *testing.T) {
db := setupStoreTest(t)

if _, err := db.ExecContext(context.Background(), `
INSERT INTO workerutil_test (id, state, execution_logs, uploaded_at)
VALUES
(1, 'queued', E'{"{\\"key\\": \\"test\\"}"}', NOW() - '1 minute'::interval)
`); err != nil {
t.Fatalf("unexpected error inserting records: %s", err)
}

record, ok, err := testStore(db, defaultTestStoreOptions(nil)).Dequeue(context.Background(), "test", nil)
assertDequeueRecordResult(t, 1, record, ok, err)
assertDequeueRecordResultLogCount(t, 0, record)
}

func TestStoreDequeueDelay(t *testing.T) {
db := setupStoreTest(t)

@@ -384,7 +400,7 @@ func TestStoreAddExecutionLogEntry(t *testing.T) {
Command: command,
Out: payload,
}
if err := testStore(db, defaultTestStoreOptions(nil)).AddExecutionLogEntry(context.Background(), 1, entry); err != nil {
if err := testStore(db, defaultTestStoreOptions(nil)).AddExecutionLogEntry(context.Background(), 1, entry, AddExecutionLogEntryOptions{}); err != nil {
t.Fatalf("unexpected error adding executor log entry: %s", err)
}
}
@@ -424,7 +440,7 @@ func TestStoreMarkComplete(t *testing.T) {
t.Fatalf("unexpected error inserting records: %s", err)
}

marked, err := testStore(db, defaultTestStoreOptions(nil)).MarkComplete(context.Background(), 1)
marked, err := testStore(db, defaultTestStoreOptions(nil)).MarkComplete(context.Background(), 1, MarkFinalOptions{})
if err != nil {
t.Fatalf("unexpected error marking record as completed: %s", err)
}
@@ -466,7 +482,7 @@ func TestStoreMarkCompleteNotProcessing(t *testing.T) {
t.Fatalf("unexpected error inserting records: %s", err)
}

marked, err := testStore(db, defaultTestStoreOptions(nil)).MarkComplete(context.Background(), 1)
marked, err := testStore(db, defaultTestStoreOptions(nil)).MarkComplete(context.Background(), 1, MarkFinalOptions{})
if err != nil {
t.Fatalf("unexpected error marking record as completed: %s", err)
}
@@ -508,7 +524,7 @@ func TestStoreMarkErrored(t *testing.T) {
t.Fatalf("unexpected error inserting records: %s", err)
}

marked, err := testStore(db, defaultTestStoreOptions(nil)).MarkErrored(context.Background(), 1, "new message")
marked, err := testStore(db, defaultTestStoreOptions(nil)).MarkErrored(context.Background(), 1, "new message", MarkFinalOptions{})
if err != nil {
t.Fatalf("unexpected error marking record as errored: %s", err)
}
@@ -550,7 +566,7 @@ func TestStoreMarkFailed(t *testing.T) {
t.Fatalf("unexpected error inserting records: %s", err)
}

marked, err := testStore(db, defaultTestStoreOptions(nil)).MarkFailed(context.Background(), 1, "new message")
marked, err := testStore(db, defaultTestStoreOptions(nil)).MarkFailed(context.Background(), 1, "new message", MarkFinalOptions{})
if err != nil {
t.Fatalf("unexpected error marking upload as completed: %s", err)
}
@@ -592,12 +608,12 @@ func TestStoreMarkErroredAlreadyCompleted(t *testing.T) {
t.Fatalf("unexpected error inserting records: %s", err)
}

marked, err := testStore(db, defaultTestStoreOptions(nil)).MarkErrored(context.Background(), 1, "new message")
marked, err := testStore(db, defaultTestStoreOptions(nil)).MarkErrored(context.Background(), 1, "new message", MarkFinalOptions{})
if err != nil {
t.Fatalf("unexpected error marking record as errored: %s", err)
}
if !marked {
t.Fatalf("expected record to be marked")
if marked {
t.Fatalf("expected record not to be marked errired")
}

rows, err := db.QueryContext(context.Background(), `SELECT state, failure_message FROM workerutil_test WHERE id = 1`)
@@ -615,11 +631,11 @@ func TestStoreMarkErroredAlreadyCompleted(t *testing.T) {
if err := rows.Scan(&state, &failureMessage); err != nil {
t.Fatalf("unexpected error scanning record: %s", err)
}
if state != "errored" {
t.Errorf("unexpected state. want=%q have=%q", "errored", state)
if state != "completed" {
t.Errorf("unexpected state. want=%q have=%q", "completed", state)
}
if failureMessage == nil || *failureMessage != "new message" {
t.Errorf("unexpected failure message. want=%v have=%v", "new message", failureMessage)
if failureMessage != nil {
t.Errorf("unexpected non-empty failure message")
}
}

@@ -634,7 +650,7 @@ func TestStoreMarkErroredAlreadyErrored(t *testing.T) {
t.Fatalf("unexpected error inserting records: %s", err)
}

marked, err := testStore(db, defaultTestStoreOptions(nil)).MarkErrored(context.Background(), 1, "new message")
marked, err := testStore(db, defaultTestStoreOptions(nil)).MarkErrored(context.Background(), 1, "new message", MarkFinalOptions{})
if err != nil {
t.Fatalf("unexpected error marking record as errored: %s", err)
}
@@ -682,7 +698,7 @@ func TestStoreMarkErroredRetriesExhausted(t *testing.T) {
store := testStore(db, options)

for i := 1; i < 3; i++ {
marked, err := store.MarkErrored(context.Background(), i, "new message")
marked, err := store.MarkErrored(context.Background(), i, "new message", MarkFinalOptions{})
if err != nil {
t.Fatalf("unexpected error marking record as errored: %s", err)
}
@@ -814,7 +830,7 @@ func TestStoreHeartbeat(t *testing.T) {
clock := glock.NewMockClockAt(now)
store := testStore(db, defaultTestStoreOptions(clock))

if err := store.Heartbeat(context.Background(), 1); err != nil {
if _, err := store.Heartbeat(context.Background(), []int{1}, HeartbeatOptions{}); err != nil {
t.Fatalf("unexpected error updating heartbeat: %s", err)
}

@@ -849,7 +865,7 @@ func TestStoreHeartbeat(t *testing.T) {
`); err != nil {
t.Fatalf("unexpected error updating records: %s", err)
}
if err := store.Heartbeat(context.Background(), 1); err != nil {
if _, err := store.Heartbeat(context.Background(), []int{1}, HeartbeatOptions{}); err != nil {
t.Fatalf("unexpected error updating heartbeat: %s", err)
}

20 changes: 20 additions & 0 deletions internal/workerutil/dbworker/store_shim.go
Original file line number Diff line number Diff line change
@@ -46,6 +46,26 @@ func (s *storeShim) Dequeue(ctx context.Context, workerHostname string, extraArg
return s.Store.Dequeue(ctx, workerHostname, conditions)
}

func (s *storeShim) Heartbeat(ctx context.Context, ids []int) (knownIDs []int, err error) {
return s.Store.Heartbeat(ctx, ids, store.HeartbeatOptions{})
}

func (s *storeShim) AddExecutionLogEntry(ctx context.Context, id int, entry workerutil.ExecutionLogEntry) error {
return s.Store.AddExecutionLogEntry(ctx, id, entry, store.AddExecutionLogEntryOptions{})
}

func (s *storeShim) MarkComplete(ctx context.Context, id int) (bool, error) {
return s.Store.MarkComplete(ctx, id, store.MarkFinalOptions{})
}

func (s *storeShim) MarkFailed(ctx context.Context, id int, failureMessage string) (bool, error) {
return s.Store.MarkFailed(ctx, id, failureMessage, store.MarkFinalOptions{})
}

func (s *storeShim) MarkErrored(ctx context.Context, id int, errorMessage string) (bool, error) {
return s.Store.MarkErrored(ctx, id, errorMessage, store.MarkFinalOptions{})
}

// ErrNotConditions occurs when a PreDequeue handler returns non-sql query extra arguments.
var ErrNotConditions = errors.New("expected slice of *sqlf.Query values")

43 changes: 23 additions & 20 deletions internal/workerutil/mock_store_test.go
5 changes: 3 additions & 2 deletions internal/workerutil/store.go
Original file line number Diff line number Diff line change
@@ -22,8 +22,9 @@ type Store interface {
// flag indicating the existence of a processable record.
Dequeue(ctx context.Context, workerHostname string, extraArguments interface{}) (Record, bool, error)

// Heartbeat marks the given record as currently being processed.:2
Heartbeat(ctx context.Context, id int) error
// Heartbeat updates last_heartbeat_at of all the given jobs, when they're processing. All IDs of records that were
// touched are returned.
Heartbeat(ctx context.Context, jobIDs []int) (knownIDs []int, err error)

// AddExecutionLogEntry adds an executor log entry to the record.
AddExecutionLogEntry(ctx context.Context, id int, entry ExecutionLogEntry) error
2 changes: 1 addition & 1 deletion internal/workerutil/worker.go
Original file line number Diff line number Diff line change
@@ -183,7 +183,7 @@ func (w *Worker) dequeueAndHandle() (dequeued bool, err error) {
}

id := record.RecordID()
if err := w.store.Heartbeat(heartbeatCtx, id); err != nil {
if _, err := w.store.Heartbeat(heartbeatCtx, []int{id}); err != nil {
log15.Error("Failed to refresh last_heartbeat_at", "name", w.options.Name, "id", id, "error", err)
}
}
4 changes: 2 additions & 2 deletions internal/workerutil/worker_test.go
Original file line number Diff line number Diff line change
@@ -355,9 +355,9 @@ func TestWorkerDequeueHeartbeat(t *testing.T) {
}

heartbeats := make(chan struct{})
store.HeartbeatFunc.SetDefaultHook(func(c context.Context, i int) error {
store.HeartbeatFunc.SetDefaultHook(func(c context.Context, i []int) ([]int, error) {
heartbeats <- struct{}{}
return nil
return i, nil
})

worker := newWorker(context.Background(), store, handler, options, clock)
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
BEGIN;

ALTER TABLE IF EXISTS cm_trigger_jobs DROP COLUMN IF EXISTS execution_logs;
ALTER TABLE IF EXISTS cm_action_jobs DROP COLUMN IF EXISTS execution_logs;

COMMIT;
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
BEGIN;

ALTER TABLE IF EXISTS cm_trigger_jobs ADD COLUMN IF NOT EXISTS execution_logs JSON[];
ALTER TABLE IF EXISTS cm_action_jobs ADD COLUMN IF NOT EXISTS execution_logs JSON[];

COMMIT;