Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

JSON Streaming #412

Merged
merged 25 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
d8b7e9f
wip: ingest streaming
rvazarkar Feb 12, 2024
c0b4bb7
wip: ingest streaming
rvazarkar Feb 12, 2024
c9448a6
chore: code golf
zinic Feb 12, 2024
4759996
wip: ingest streaming
rvazarkar Feb 12, 2024
e96836b
wip: wip commit
rvazarkar Feb 12, 2024
a37832f
Merge remote-tracking branch 'origin/main' into BED-4114
rvazarkar Feb 12, 2024
6886b98
feat: json file streaming implementation
rvazarkar Feb 12, 2024
46fcd82
chore: static analysis fixes
rvazarkar Feb 13, 2024
057c1f9
fix: incorrect comparison breaking tests
rvazarkar Feb 13, 2024
dcf6386
fix: don't error out immediately on invalid meta tag
rvazarkar Feb 13, 2024
2edf4ba
chore: add missing copyright
rvazarkar Feb 13, 2024
1e6dcad
chore: add missing copyright
rvazarkar Feb 13, 2024
4056358
example: added example unit testing to SeekToDataTag
superlinkx Feb 13, 2024
211ff0b
chore: add some more tests, and validate data tag during meta tag checks
rvazarkar Feb 14, 2024
13a3a86
chore: export variables for use in bhe
rvazarkar Feb 14, 2024
6e3273c
Merge remote-tracking branch 'origin/main' into BED-4114
rvazarkar Feb 14, 2024
4177dd3
fix: initialize azure struct
rvazarkar Feb 14, 2024
8f81ed1
chore: remove unnecessary change
rvazarkar Feb 14, 2024
8d8232b
chore: use var blocks, optimize reallocations
rvazarkar Feb 14, 2024
fb0a559
chore: add comment
rvazarkar Feb 14, 2024
d720c3c
chore: add comment
rvazarkar Feb 14, 2024
10231a2
chore: more cleanup
rvazarkar Feb 14, 2024
998f3c6
chore: fix static analysis nit
rvazarkar Feb 14, 2024
4f64ee6
Merge branch 'main' into BED-4114
rvazarkar Feb 14, 2024
86bf46c
Merge remote-tracking branch 'origin/main' into BED-4114
rvazarkar Feb 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 0 additions & 14 deletions cmd/api/src/daemons/datapipe/azure_convertors.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,20 +36,6 @@ const (
PrincipalTypeUser = "User"
)

func convertAzureData(data []json.RawMessage) ConvertedAzureData {
converted := CreateConvertedAzureData(0)
for _, bytes := range data {
var data AzureBase
if err := json.Unmarshal(bytes, &data); err != nil {
log.Error().Fault(err).Msg("Failed to convert Azure data")
} else {
convert := getKindConverter(data.Kind)
convert(data.Data, &converted)
}
}
return converted
}

func getKindConverter(kind enums.Kind) func(json.RawMessage, *ConvertedAzureData) {
switch kind {
case enums.KindAZApp:
Expand Down
279 changes: 102 additions & 177 deletions cmd/api/src/daemons/datapipe/convertors.go

Large diffs are not rendered by default.

153 changes: 153 additions & 0 deletions cmd/api/src/daemons/datapipe/decoders.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
// Copyright 2024 Specter Ops, Inc.
//
// Licensed under the Apache License, Version 2.0
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

package datapipe

import (
"github.com/specterops/bloodhound/dawgs/graph"
"github.com/specterops/bloodhound/ein"
"github.com/specterops/bloodhound/log"
"io"
)

type ConversionFunc[T any] func(decoded T, converted *ConvertedData)
rvazarkar marked this conversation as resolved.
Show resolved Hide resolved

func decodeBasicData[T any](batch graph.Batch, reader io.ReadSeeker, conversionFunc ConversionFunc[T]) error {
decoder, err := CreateIngestDecoder(reader)
if err != nil {
return err
}

var (
count = 0
batchCount = 0
convertedData ConvertedData
)

for decoder.More() {
var decodeTarget T
rvazarkar marked this conversation as resolved.
Show resolved Hide resolved
if err := decoder.Decode(&decodeTarget); err != nil {
log.Errorf("Error decoding %T object: %v", decodeTarget, err)
} else {
count++
conversionFunc(decodeTarget, &convertedData)
}

if count == ingestCountThreshold {
batchCount++
IngestBasicData(batch, convertedData)
convertedData.Clear()
count = 0
}
}

if count > 0 {
IngestBasicData(batch, convertedData)
}

return nil
}

func decodeGroupData(batch graph.Batch, reader io.ReadSeeker) error {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would tossing in the ingest function in decodeBasicData eliminate the need for these 3 special copy pasta ❄️ s?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

They take different types

decoder, err := CreateIngestDecoder(reader)
if err != nil {
return err
}

convertedData := ConvertedGroupData{}
rvazarkar marked this conversation as resolved.
Show resolved Hide resolved
var group ein.Group
count := 0
for decoder.More() {
if err := decoder.Decode(&group); err != nil {
log.Errorf("Error decoding group object: %v", err)
} else {
count++
convertGroupData(group, &convertedData)
if count == ingestCountThreshold {
IngestGroupData(batch, convertedData)
convertedData.Clear()
count = 0
}
}
}

if count > 0 {
IngestGroupData(batch, convertedData)
}

return nil
}

func decodeSessionData(batch graph.Batch, reader io.ReadSeeker) error {
decoder, err := CreateIngestDecoder(reader)
if err != nil {
return err
}

convertedData := ConvertedSessionData{}
rvazarkar marked this conversation as resolved.
Show resolved Hide resolved
var session ein.Session
count := 0
for decoder.More() {
if err := decoder.Decode(&session); err != nil {
log.Errorf("Error decoding session object: %v", err)
} else {
count++
convertSessionData(session, &convertedData)
if count == ingestCountThreshold {
IngestSessions(batch, convertedData.SessionProps)
convertedData.Clear()
count = 0
}
}
}

if count > 0 {
IngestSessions(batch, convertedData.SessionProps)
}

return nil
}

func decodeAzureData(batch graph.Batch, reader io.ReadSeeker) error {
decoder, err := CreateIngestDecoder(reader)
if err != nil {
return err
}

convertedData := ConvertedAzureData{}
rvazarkar marked this conversation as resolved.
Show resolved Hide resolved
var data AzureBase
count := 0
for decoder.More() {
if err := decoder.Decode(&data); err != nil {
log.Errorf("Error decoding azure object: %v", err)
} else {
convert := getKindConverter(data.Kind)
convert(data.Data, &convertedData)
count++
if count == ingestCountThreshold {
IngestAzureData(batch, convertedData)
convertedData.Clear()
count = 0
}
}
}

if count > 0 {
IngestAzureData(batch, convertedData)
}

return nil
}
83 changes: 83 additions & 0 deletions cmd/api/src/daemons/datapipe/fileops.go
superlinkx marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
// Copyright 2024 Specter Ops, Inc.
//
// Licensed under the Apache License, Version 2.0
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//
// SPDX-License-Identifier: Apache-2.0

package datapipe

import (
"encoding/json"
"errors"
"fmt"
"io"
)

const (
delimOpenBracket = json.Delim('{')
delimCloseBracket = json.Delim('}')
delimOpenSquareBracket = json.Delim('[')
delimCloseSquareBracket = json.Delim(']')
)

func SeekToDataTag(decoder *json.Decoder) error {
depth := 0
dataTagFound := false
for {
if token, err := decoder.Token(); err != nil {
if errors.Is(err, io.EOF) {
return ErrDataTagNotFound
}

return err
} else {
//Break here to allow for one more token read, which should take us to the "[" token, exactly where we need to be
if dataTagFound {
//Do some extra checks
if typed, ok := token.(json.Delim); !ok {
return ErrInvalidDataTag
} else if typed != delimOpenSquareBracket {
return ErrInvalidDataTag
}
//Break out of our loop if we're in a good spot
return nil
}
switch typed := token.(type) {
case json.Delim:
switch typed {
case delimCloseBracket, delimCloseSquareBracket:
depth--
case delimOpenBracket, delimOpenSquareBracket:
depth++
}
case string:
if !dataTagFound && depth == 1 && typed == "data" {
dataTagFound = true
}
}
}
}
}

func CreateIngestDecoder(reader io.ReadSeeker) (*json.Decoder, error) {
if _, err := reader.Seek(0, io.SeekStart); err != nil {
return nil, fmt.Errorf("error seeking to start of file: %w", err)
} else {
decoder := json.NewDecoder(reader)
if err := SeekToDataTag(decoder); err != nil {
return nil, fmt.Errorf("error seeking to data tag: %w", err)
} else {
return decoder, nil
}
}
}
Loading
Loading