diff --git a/example/submitqueue/orchestrator/server/BUILD.bazel b/example/submitqueue/orchestrator/server/BUILD.bazel index c3bc6df1..caacd86e 100644 --- a/example/submitqueue/orchestrator/server/BUILD.bazel +++ b/example/submitqueue/orchestrator/server/BUILD.bazel @@ -30,6 +30,8 @@ go_library( "//submitqueue/extension/changeprovider", "//submitqueue/extension/changeprovider/fake", "//submitqueue/extension/changeprovider/github", + "//submitqueue/extension/changeprovider/phabricator", + "//submitqueue/extension/changeprovider/routing", "//submitqueue/extension/conflict", "//submitqueue/extension/conflict/all", "//submitqueue/extension/conflict/fake", diff --git a/example/submitqueue/orchestrator/server/main.go b/example/submitqueue/orchestrator/server/main.go index 3478ffe6..2ed845c3 100644 --- a/example/submitqueue/orchestrator/server/main.go +++ b/example/submitqueue/orchestrator/server/main.go @@ -20,6 +20,7 @@ import ( "errors" "fmt" "net" + nethttp "net/http" "os" "os/signal" "sync" @@ -49,6 +50,8 @@ import ( "github.com/uber/submitqueue/submitqueue/extension/changeprovider" cpfake "github.com/uber/submitqueue/submitqueue/extension/changeprovider/fake" githubprovider "github.com/uber/submitqueue/submitqueue/extension/changeprovider/github" + phabprovider "github.com/uber/submitqueue/submitqueue/extension/changeprovider/phabricator" + routingprovider "github.com/uber/submitqueue/submitqueue/extension/changeprovider/routing" "github.com/uber/submitqueue/submitqueue/extension/conflict" "github.com/uber/submitqueue/submitqueue/extension/conflict/all" conflictfake "github.com/uber/submitqueue/submitqueue/extension/conflict/fake" @@ -757,16 +760,42 @@ func parseTimeout(envVal string, defaultVal time.Duration) time.Duration { return defaultVal } -// newChangeProvider creates a ChangeProvider for GitHub (github.com), configured -// via GITHUB_BASE_URL, GITHUB_TOKEN, and GITHUB_TIMEOUT. When GITHUB_TOKEN is -// unset it returns the fake change provider (one empty ChangeInfo per URI unless -// a URI carries a failure marker, see changeprovider/fake). +// newChangeProvider creates a routing ChangeProvider containing GitHub and Phab ChangeProviders. +// When neither GITHUB_TOKEN nor PHAB_API_TOKEN is set, falls back to the fake change provider. func newChangeProvider(logger *zap.Logger, scope tally.Scope) (changeprovider.ChangeProvider, error) { - if os.Getenv("GITHUB_TOKEN") == "" { - logger.Warn("GITHUB_TOKEN not set; using fake change provider (empty change info unless URI-marked)") + ghProvider, err := newGitHubChangeProvider(logger, scope) + if err != nil { + return nil, err + } + + phabProvider, err := newPhabChangeProvider(logger, scope) + if err != nil { + return nil, err + } + + if ghProvider == nil && phabProvider == nil { + logger.Warn("no change provider tokens set; using fake change provider (empty change info unless URI-marked)") return cpfake.New(), nil } + routingProvider, err := routingprovider.NewProvider(routingprovider.Params{ + GitHub: ghProvider, + Phabricator: phabProvider, + }) + if err != nil { + return nil, fmt.Errorf("failed to create routing change provider: %w", err) + } + return routingProvider, nil +} + +// newGitHubChangeProvider creates a GitHub ChangeProvider configured via +// GITHUB_BASE_URL, GITHUB_TOKEN, and GITHUB_TIMEOUT. Returns nil when +// GITHUB_TOKEN is unset. +func newGitHubChangeProvider(logger *zap.Logger, scope tally.Scope) (changeprovider.ChangeProvider, error) { + if os.Getenv("GITHUB_TOKEN") == "" { + return nil, nil + } + client, err := http.NewClient(getEnv("GITHUB_BASE_URL", "https://api.github.com")) if err != nil { return nil, fmt.Errorf("failed to build GitHub HTTP client: %w", err) @@ -774,13 +803,57 @@ func newChangeProvider(logger *zap.Logger, scope tally.Scope) (changeprovider.Ch ts := oauth2.StaticTokenSource(&oauth2.Token{AccessToken: os.Getenv("GITHUB_TOKEN")}) client.Transport = &oauth2.Transport{Source: ts, Base: client.Transport} - client.Timeout = parseTimeout(os.Getenv("GITHUB_TIMEOUT"), 30*time.Second) return githubprovider.NewProvider(githubprovider.Params{ HTTPClient: client, Logger: logger.Sugar(), - MetricsScope: scope.SubScope("changeprovider"), + MetricsScope: scope.SubScope("changeprovider.github"), + }), nil +} + +// apiTokenTransport injects a Phabricator API token as a query parameter in each request. +type apiTokenTransport struct { + token string + next nethttp.RoundTripper +} + +func (t *apiTokenTransport) RoundTrip(req *nethttp.Request) (*nethttp.Response, error) { + r := req.Clone(req.Context()) + q := r.URL.Query() + q.Set("api.token", t.token) + r.URL.RawQuery = q.Encode() + return t.next.RoundTrip(r) +} + +// newPhabChangeProvider creates a Phabricator ChangeProvider configured via PHAB_API_ENDPOINT and PHAB_API_TOKEN. +// Returns nil when PHAB_API_TOKEN or PHAB_API_ENDPOINT are unset. +func newPhabChangeProvider(logger *zap.Logger, scope tally.Scope) (changeprovider.ChangeProvider, error) { + token := os.Getenv("PHAB_API_TOKEN") + if token == "" { + return nil, nil + } + + endpoint := os.Getenv("PHAB_API_ENDPOINT") + if endpoint == "" { + return nil, nil + } + + client, err := http.NewClient(endpoint) + if err != nil { + return nil, fmt.Errorf("failed to build Phabricator HTTP client: %w", err) + } + + baseTransport := client.Transport.(*http.BaseURLTransport) + baseTransport.Next = &apiTokenTransport{ + token: token, + next: baseTransport.Next, + } + + return phabprovider.NewProvider(phabprovider.Params{ + HTTPClient: client, + Logger: logger.Sugar(), + MetricsScope: scope.SubScope("changeprovider.phabricator"), }), nil } diff --git a/submitqueue/extension/changeprovider/routing/BUILD.bazel b/submitqueue/extension/changeprovider/routing/BUILD.bazel new file mode 100644 index 00000000..fbd3f015 --- /dev/null +++ b/submitqueue/extension/changeprovider/routing/BUILD.bazel @@ -0,0 +1,30 @@ +load("@rules_go//go:def.bzl", "go_library", "go_test") + +go_library( + name = "routing", + srcs = ["provider.go"], + importpath = "github.com/uber/submitqueue/submitqueue/extension/changeprovider/routing", + visibility = ["//visibility:public"], + deps = [ + "//platform/base/change/git", + "//platform/base/change/github", + "//platform/base/change/phabricator", + "//submitqueue/entity", + "//submitqueue/extension/changeprovider", + ], +) + +go_test( + name = "routing_test", + srcs = ["provider_test.go"], + embed = [":routing"], + deps = [ + "//platform/base/change", + "//submitqueue/entity", + "//submitqueue/extension/changeprovider", + "//submitqueue/extension/changeprovider/mock", + "@com_github_stretchr_testify//assert", + "@com_github_stretchr_testify//require", + "@org_uber_go_mock//gomock", + ], +) diff --git a/submitqueue/extension/changeprovider/routing/provider.go b/submitqueue/extension/changeprovider/routing/provider.go new file mode 100644 index 00000000..dccb170c --- /dev/null +++ b/submitqueue/extension/changeprovider/routing/provider.go @@ -0,0 +1,145 @@ +// Copyright (c) 2026 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package routing provides a ChangeProvider that dispatches URIs to +// downstream change providers based on the URI's change type. +package routing + +import ( + "context" + "fmt" + + "github.com/uber/submitqueue/platform/base/change/git" + "github.com/uber/submitqueue/platform/base/change/github" + "github.com/uber/submitqueue/platform/base/change/phabricator" + "github.com/uber/submitqueue/submitqueue/entity" + "github.com/uber/submitqueue/submitqueue/extension/changeprovider" +) + +// Params holds the optional downstream providers, keyed by change type. +// At least one must be non-nil. +type Params struct { + // GitHub handles URIs that parse as GitHub change IDs. + GitHub changeprovider.ChangeProvider + // Phabricator handles URIs that parse as Phabricator change IDs. + Phabricator changeprovider.ChangeProvider + // Git handles URIs that parse as git change IDs. + Git changeprovider.ChangeProvider +} + +// matchedURI pairs a URI with its original position in the input slice. +type matchedURI struct { + // index is the position of this URI in the original request.Change.URIs slice. + index int + // uri is the raw URI string. + uri string +} + +type provider struct { + github changeprovider.ChangeProvider + phabricator changeprovider.ChangeProvider + git changeprovider.ChangeProvider +} + +// NewProvider creates a ChangeProvider that routes URIs to the appropriate +// downstream provider based on the URI's change type. +// Returns an error if all providers are nil. +func NewProvider(params Params) (changeprovider.ChangeProvider, error) { + if params.GitHub == nil && params.Phabricator == nil && params.Git == nil { + return nil, fmt.Errorf("at least one change provider must be configured") + } + return &provider{ + github: params.GitHub, + phabricator: params.Phabricator, + git: params.Git, + }, nil +} + +// Get classifies each URI in the request by change type, groups them by provider, +// calls each provider once with its subset of changes, and reassembles results in the original order. +func (p *provider) Get(ctx context.Context, request entity.Request) ([]entity.ChangeInfo, error) { + changesByProvider, err := p.groupChangesByProvider(request.Change.URIs) + if err != nil { + return nil, err + } + + results := make([]entity.ChangeInfo, len(request.Change.URIs)) + for changeProvider, changes := range changesByProvider { + uris := make([]string, 0, len(changes)) + for _, c := range changes { + uris = append(uris, c.uri) + } + + // Subrequest for each provider containing only its subset of changes. + subRequest := request + subRequest.Change.URIs = uris + + infos, getErr := changeProvider.Get(ctx, subRequest) + if getErr != nil { + return nil, getErr + } + + if len(infos) != len(changes) { + return nil, fmt.Errorf("provider returned %d results for %d URIs", len(infos), len(changes)) + } + + // Put the changes back in their original positions in the results. + for i, changeInfo := range infos { + results[changes[i].index] = changeInfo + } + } + + return results, nil +} + +// groupChangesByProvider classifies each URI by trying ParseChangeID functions and groups them by the matched provider. +// Returns an error if a URI matches no known type or matches a type whose provider was not configured. +func (p *provider) groupChangesByProvider(uris []string) (map[changeprovider.ChangeProvider][]matchedURI, error) { + grouped := make(map[changeprovider.ChangeProvider][]matchedURI) + for i, uri := range uris { + matchedProvider, err := p.matchURIToChangeProvider(uri) + if err != nil { + return nil, err + } + grouped[matchedProvider] = append(grouped[matchedProvider], matchedURI{ + index: i, + uri: uri, + }) + } + + return grouped, nil +} + +// matchURIToChangeProvider returns the provider for the given URI by trying each ParseChangeID function. +// Returns an error if no parser matches, or if a parser matches, but the corresponding provider was not configured. +func (p *provider) matchURIToChangeProvider(uri string) (changeprovider.ChangeProvider, error) { + if _, err := github.ParseChangeID(uri); err == nil { + if p.github == nil { + return nil, fmt.Errorf("URI %q is a GitHub change but no GitHub provider is configured", uri) + } + return p.github, nil + } else if _, err := phabricator.ParseChangeID(uri); err == nil { + if p.phabricator == nil { + return nil, fmt.Errorf("URI %q is a Phabricator change but no Phabricator provider is configured", uri) + } + return p.phabricator, nil + } else if _, err := git.ParseChangeID(uri); err == nil { + if p.git == nil { + return nil, fmt.Errorf("URI %q is a git change but no git provider is configured", uri) + } + return p.git, nil + } + + return nil, fmt.Errorf("URI %q does not match any known change type", uri) +} diff --git a/submitqueue/extension/changeprovider/routing/provider_test.go b/submitqueue/extension/changeprovider/routing/provider_test.go new file mode 100644 index 00000000..2cc15190 --- /dev/null +++ b/submitqueue/extension/changeprovider/routing/provider_test.go @@ -0,0 +1,359 @@ +// Copyright (c) 2026 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package routing + +import ( + "context" + "fmt" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/uber/submitqueue/submitqueue/extension/changeprovider" + + "github.com/uber/submitqueue/platform/base/change" + "github.com/uber/submitqueue/submitqueue/entity" + changeprovmock "github.com/uber/submitqueue/submitqueue/extension/changeprovider/mock" + "go.uber.org/mock/gomock" +) + +const ( + githubURI1 = "github://uber/repo/pull/1/aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa" + githubURI2 = "github://uber/repo/pull/2/bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb" + phabURI1 = "phab://D123/456" + phabURI2 = "phab://D789/101" + gitURI1 = "git://remote.example.com/uber/repo/refs%2Fheads%2Fmain/cccccccccccccccccccccccccccccccccccccccc" +) + +func newRequest(uris ...string) entity.Request { + return entity.Request{ + ID: "test-queue/1", + Queue: "test-queue", + Change: change.Change{ + URIs: uris, + }, + } +} + +func changeInfoFor(uri string) entity.ChangeInfo { + return entity.ChangeInfo{URI: uri} +} + +func TestNewProvider(t *testing.T) { + ctrl := gomock.NewController(t) + ghProvider := changeprovmock.NewMockChangeProvider(ctrl) + + tests := []struct { + name string + params Params + wantErrMsg string + }{ + { + name: "github only", + params: Params{GitHub: ghProvider}, + }, + { + name: "all nil returns error", + params: Params{}, + wantErrMsg: "at least one change provider must be configured", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := NewProvider(tt.params) + if tt.wantErrMsg != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.wantErrMsg) + assert.Nil(t, got) + return + } + require.NoError(t, err) + assert.NotNil(t, got) + }) + } +} + +func TestGet(t *testing.T) { + tests := []struct { + name string + uris []string + setup func(gh, phab *changeprovmock.MockChangeProvider) + params func(gh, phab *changeprovmock.MockChangeProvider) Params + want []entity.ChangeInfo + wantErrMsg string + }{ + { + name: "single provider all URIs", + uris: []string{githubURI1, githubURI2}, + setup: func(gh, _ *changeprovmock.MockChangeProvider) { + gh.EXPECT().Get(gomock.Any(), gomock.Any()).Return( + []entity.ChangeInfo{changeInfoFor(githubURI1), changeInfoFor(githubURI2)}, nil, + ) + }, + params: func(gh, _ *changeprovmock.MockChangeProvider) Params { + return Params{GitHub: gh} + }, + want: []entity.ChangeInfo{changeInfoFor(githubURI1), changeInfoFor(githubURI2)}, + }, + { + name: "mixed URIs routed to different providers and results reassembled in order", + uris: []string{githubURI1, phabURI1, githubURI2, phabURI2}, + setup: func(gh, phab *changeprovmock.MockChangeProvider) { + gh.EXPECT().Get(gomock.Any(), newRequest(githubURI1, githubURI2)).Return( + []entity.ChangeInfo{changeInfoFor(githubURI1), changeInfoFor(githubURI2)}, nil, + ) + phab.EXPECT().Get(gomock.Any(), newRequest(phabURI1, phabURI2)).Return( + []entity.ChangeInfo{changeInfoFor(phabURI1), changeInfoFor(phabURI2)}, nil, + ) + }, + params: func(gh, phab *changeprovmock.MockChangeProvider) Params { + return Params{GitHub: gh, Phabricator: phab} + }, + want: []entity.ChangeInfo{ + changeInfoFor(githubURI1), + changeInfoFor(phabURI1), + changeInfoFor(githubURI2), + changeInfoFor(phabURI2), + }, + }, + { + name: "downstream provider error is propagated", + uris: []string{githubURI1}, + setup: func(gh, _ *changeprovmock.MockChangeProvider) { + gh.EXPECT().Get(gomock.Any(), gomock.Any()).Return(nil, fmt.Errorf("github API error")) + }, + params: func(gh, _ *changeprovmock.MockChangeProvider) Params { + return Params{GitHub: gh} + }, + wantErrMsg: "github API error", + }, + { + name: "provider returns wrong result count", + uris: []string{githubURI1, githubURI2}, + setup: func(gh, _ *changeprovmock.MockChangeProvider) { + gh.EXPECT().Get(gomock.Any(), gomock.Any()).Return( + []entity.ChangeInfo{changeInfoFor(githubURI1)}, nil, + ) + }, + params: func(gh, _ *changeprovmock.MockChangeProvider) Params { + return Params{GitHub: gh} + }, + wantErrMsg: "provider returned 1 results for 2 URIs", + }, + { + name: "unrecognized URI fails before calling any provider", + uris: []string{"bogus://nope"}, + setup: func(_, _ *changeprovmock.MockChangeProvider) {}, + params: func(gh, _ *changeprovmock.MockChangeProvider) Params { return Params{GitHub: gh} }, + wantErrMsg: "does not match any known change type", + }, + { + name: "empty URIs returns empty results", + uris: nil, + setup: func(_, _ *changeprovmock.MockChangeProvider) {}, + params: func(gh, _ *changeprovmock.MockChangeProvider) Params { + return Params{GitHub: gh} + }, + want: []entity.ChangeInfo{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + ctrl := gomock.NewController(t) + ghMock := changeprovmock.NewMockChangeProvider(ctrl) + phabMock := changeprovmock.NewMockChangeProvider(ctrl) + + tt.setup(ghMock, phabMock) + p, err := NewProvider(tt.params(ghMock, phabMock)) + require.NoError(t, err) + + got, err := p.Get(context.Background(), newRequest(tt.uris...)) + if tt.wantErrMsg != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.wantErrMsg) + return + } + require.NoError(t, err) + assert.Equal(t, tt.want, got) + }) + } +} + +func TestGroupChangesByProvider(t *testing.T) { + ctrl := gomock.NewController(t) + ghProvider := changeprovmock.NewMockChangeProvider(ctrl) + phabProvider := changeprovmock.NewMockChangeProvider(ctrl) + gitProvider := changeprovmock.NewMockChangeProvider(ctrl) + + tests := []struct { + name string + params Params + uris []string + wantGroups map[changeprovider.ChangeProvider][]matchedURI + wantErrMsg string + }{ + { + name: "all github URIs grouped together", + params: Params{GitHub: ghProvider}, + uris: []string{githubURI1, githubURI2}, + wantGroups: map[changeprovider.ChangeProvider][]matchedURI{ + ghProvider: { + {index: 0, uri: githubURI1}, + {index: 1, uri: githubURI2}, + }, + }, + }, + { + name: "mixed github and phabricator", + params: Params{GitHub: ghProvider, Phabricator: phabProvider}, + uris: []string{githubURI1, phabURI1, githubURI2}, + wantGroups: map[changeprovider.ChangeProvider][]matchedURI{ + ghProvider: { + {index: 0, uri: githubURI1}, + {index: 2, uri: githubURI2}, + }, + phabProvider: { + {index: 1, uri: phabURI1}, + }, + }, + }, + { + name: "all three change types", + params: Params{GitHub: ghProvider, Phabricator: phabProvider, Git: gitProvider}, + uris: []string{phabURI1, gitURI1, githubURI1}, + wantGroups: map[changeprovider.ChangeProvider][]matchedURI{ + phabProvider: { + {index: 0, uri: phabURI1}, + }, + gitProvider: { + {index: 1, uri: gitURI1}, + }, + ghProvider: { + {index: 2, uri: githubURI1}, + }, + }, + }, + { + name: "match error is propagated", + params: Params{GitHub: ghProvider}, + uris: []string{githubURI1, "bogus://unknown/uri"}, + wantErrMsg: "does not match any known change type", + }, + { + name: "empty URI list returns empty groups", + params: Params{GitHub: ghProvider}, + uris: nil, + wantGroups: map[changeprovider.ChangeProvider][]matchedURI{}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := &provider{ + github: tt.params.GitHub, + phabricator: tt.params.Phabricator, + git: tt.params.Git, + } + + got, err := p.groupChangesByProvider(tt.uris) + if tt.wantErrMsg != "" { + require.Error(t, err) + assert.Contains(t, err.Error(), tt.wantErrMsg) + return + } + require.NoError(t, err) + assert.Equal(t, tt.wantGroups, got) + }) + } +} + +func TestMatchURIToChangeProvider(t *testing.T) { + ctrl := gomock.NewController(t) + ghProvider := changeprovmock.NewMockChangeProvider(ctrl) + phabProvider := changeprovmock.NewMockChangeProvider(ctrl) + gitProvider := changeprovmock.NewMockChangeProvider(ctrl) + + tests := []struct { + name string + params Params + uri string + wantProvider changeprovider.ChangeProvider + wantErrMsg string + }{ + { + name: "github URI matches github provider", + params: Params{GitHub: ghProvider, Phabricator: phabProvider, Git: gitProvider}, + uri: githubURI1, + wantProvider: ghProvider, + }, + { + name: "phabricator URI matches phabricator provider", + params: Params{GitHub: ghProvider, Phabricator: phabProvider, Git: gitProvider}, + uri: phabURI1, + wantProvider: phabProvider, + }, + { + name: "git URI matches git provider", + params: Params{GitHub: ghProvider, Phabricator: phabProvider, Git: gitProvider}, + uri: gitURI1, + wantProvider: gitProvider, + }, + { + name: "unrecognized URI returns error", + params: Params{GitHub: ghProvider}, + uri: "bogus://unknown/uri", + wantErrMsg: "does not match any known change type", + }, + { + name: "github URI with nil github provider returns error", + params: Params{Phabricator: phabProvider}, + uri: githubURI1, + wantErrMsg: "no GitHub provider is configured", + }, + { + name: "phabricator URI with nil phabricator provider returns error", + params: Params{GitHub: ghProvider}, + uri: phabURI1, + wantErrMsg: "no Phabricator provider is configured", + }, + { + name: "git URI with nil git provider returns error", + params: Params{GitHub: ghProvider}, + uri: gitURI1, + wantErrMsg: "no git provider is configured", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := &provider{ + github: tt.params.GitHub, + phabricator: tt.params.Phabricator, + git: tt.params.Git, + } + + got, err := p.matchURIToChangeProvider(tt.uri) + if tt.wantErrMsg != "" { + require.ErrorContains(t, err, tt.wantErrMsg) + assert.Nil(t, got) + return + } + require.NoError(t, err) + assert.Equal(t, tt.wantProvider, got) + }) + } +}