Skip to content

Commit

Permalink
Add group_by support to MPD loaders
Browse files Browse the repository at this point in the history
This change introduces a new "group_by" option on the MPD-based loaders.
When provided, the loaders use these fields to group the songs they read
before adding the items to the ShuffleChain (using the new Shuffle-chain
group support). The grouping is done via a basic unordered_map: Keys are
vectors of tag values corresponding to the group_by fields, and values
are the URIs in the group. Once all Groups are generated, they are added
to the chain.
  • Loading branch information
joshkunz committed May 10, 2020
1 parent 9dc2248 commit d5b5049
Show file tree
Hide file tree
Showing 8 changed files with 125 additions and 15 deletions.
6 changes: 5 additions & 1 deletion meson.build
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ absl_libs = [
'int128',
'str_format_internal',
'strings_internal',
'strings'
'strings',

# Via Hash:
'hash',
'city',
]

absl_deps = []
Expand Down
39 changes: 37 additions & 2 deletions src/load.cc
Original file line number Diff line number Diff line change
@@ -1,20 +1,54 @@
#include "load.h"

#include <iostream>
#include <unordered_map>
#include <vector>

#include <absl/hash/hash.h>
#include <absl/strings/str_format.h>

namespace ashuffle {

namespace {

// A Group is a vector of field values, present or not.
typedef std::vector<std::optional<std::string>> Group;

// A GroupMap is a mapping from Groups to song URI vectors of the URIs in the
// given group.
typedef std::unordered_map<Group, std::vector<std::string>, absl::Hash<Group>>
GroupMap;

} // namespace

/* build the list of songs to shuffle from using MPD */
void MPDLoader::Load(ShuffleChain *songs) {
GroupMap groups;

std::unique_ptr<mpd::SongReader> reader = mpd_->ListAll();
while (!reader->Done()) {
std::unique_ptr<mpd::Song> song = *reader->Next();
if (!Verify(*song)) {
continue;
}
songs->Add(song->URI());

if (group_by_.empty()) {
songs->Add(song->URI());
continue;
}
Group group;
for (auto &field : group_by_) {
group.emplace_back(song->Tag(field));
}
groups[group].push_back(song->URI());
}

if (group_by_.empty()) {
return;
}

for (auto &&[_, group] : groups) {
songs->Add(group);
}
}

Expand All @@ -28,8 +62,9 @@ bool MPDLoader::Verify(const mpd::Song &song) {
}

FileMPDLoader::FileMPDLoader(mpd::MPD *mpd, const std::vector<Rule> &ruleset,
const std::vector<enum mpd_tag_type> &group_by,
std::istream *file)
: MPDLoader(mpd, ruleset), file_(file) {
: MPDLoader(mpd, ruleset, group_by), file_(file) {
for (std::string uri; std::getline(*file_, uri);) {
valid_uris_.emplace_back(uri);
}
Expand Down
9 changes: 8 additions & 1 deletion src/load.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@
#include <string_view>
#include <vector>

#include <mpd/tag.h>

#include "mpd.h"
#include "rule.h"
#include "shuffle.h"
Expand All @@ -25,7 +27,10 @@ class MPDLoader : public Loader {
public:
~MPDLoader() override = default;
MPDLoader(mpd::MPD* mpd, const std::vector<Rule>& ruleset)
: mpd_(mpd), rules_(ruleset){};
: MPDLoader(mpd, ruleset, std::vector<enum mpd_tag_type>()){};
MPDLoader(mpd::MPD* mpd, const std::vector<Rule>& ruleset,
const std::vector<enum mpd_tag_type>& group_by)
: mpd_(mpd), rules_(ruleset), group_by_(group_by){};

void Load(ShuffleChain* into) override;

Expand All @@ -35,12 +40,14 @@ class MPDLoader : public Loader {
private:
mpd::MPD* mpd_;
const std::vector<Rule>& rules_;
const std::vector<enum mpd_tag_type> group_by_;
};

class FileMPDLoader : public MPDLoader {
public:
~FileMPDLoader() override = default;
FileMPDLoader(mpd::MPD* mpd, const std::vector<Rule>& ruleset,
const std::vector<enum mpd_tag_type>& group_by,
std::istream* file);

protected:
Expand Down
10 changes: 6 additions & 4 deletions src/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,13 @@ const int kWindowSize = 7;

std::unique_ptr<Loader> BuildLoader(mpd::MPD* mpd, const Options& opts) {
if (opts.file_in != nullptr && opts.check_uris) {
return std::make_unique<FileMPDLoader>(mpd, opts.ruleset, opts.file_in);
return std::make_unique<FileMPDLoader>(mpd, opts.ruleset, opts.group_by,
opts.file_in);
} else if (opts.file_in != nullptr) {
return std::make_unique<FileLoader>(opts.file_in);
}

return std::make_unique<MPDLoader>(mpd, opts.ruleset);
return std::make_unique<MPDLoader>(mpd, opts.ruleset, opts.group_by);
}

} // namespace
Expand Down Expand Up @@ -60,8 +61,9 @@ int main(int argc, const char* argv[]) {

Options options = std::move(std::get<Options>(parse));

if (!options.group_by.empty()) {
std::cerr << "-g/--group-by not yet supported" << std::endl;
if (!options.check_uris && !options.group_by.empty()) {
std::cerr << "-g/--group-by not supported with no-check" << std::endl;
exit(EXIT_FAILURE);
}

std::function<std::string()> pass_f = [] {
Expand Down
6 changes: 4 additions & 2 deletions t/integration/ashuffle/ashuffle.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ type Ashuffle struct {
cancelFunc func()

Stdout *bytes.Buffer
Stderr *bytes.Buffer
}

const MaxShutdownWait = 5 * time.Second
Expand Down Expand Up @@ -118,9 +119,9 @@ func New(ctx context.Context, path string, opts *Options) (*Ashuffle, error) {
runCtx, cancel := context.WithCancel(ctx)
var args []string
cmd := exec.CommandContext(runCtx, path, args...)
stdout := bytes.Buffer{}
var stdout, stderr bytes.Buffer
cmd.Stdout = &stdout
cmd.Stderr = os.Stderr
cmd.Stderr = &stderr

if opts != nil {
cmd.Args = append([]string{path}, opts.Args...)
Expand All @@ -144,5 +145,6 @@ func New(ctx context.Context, path string, opts *Options) (*Ashuffle, error) {
cmd: cmd,
cancelFunc: cancel,
Stdout: &stdout,
Stderr: &stderr,
}, nil
}
40 changes: 40 additions & 0 deletions t/integration/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,42 @@ func TestStartup(t *testing.T) {
mpdi.Shutdown()
}

func TestStartupFail(t *testing.T) {
t.Parallel()
ctx := context.Background()

tests := []struct {
desc string
options *ashuffle.Options
wantStderrContains string
}{
{
desc: "No MPD server running",
wantStderrContains: "could not connect to mpd",
},
{
desc: "Group by and no-check",
options: &ashuffle.Options{Args: []string{"-g", "album", "--no-check"}},
wantStderrContains: "group-by not supported with no-check",
},
}

for _, test := range tests {
as, err := ashuffle.New(ctx, ashuffleBin, test.options)
if err != nil {
t.Errorf("failed to start ashuffle: %v", err)
continue
}
if err := as.Shutdown(ashuffle.ShutdownSoft); err == nil {
t.Errorf("ashuffle shutdown cleanly, but we wanted an error: %v", err)
}
if stderr := as.Stderr.String(); !strings.Contains(stderr, test.wantStderrContains) {
t.Errorf("want stderr contains %q, got stderr:\n%s", test.wantStderrContains, stderr)
}
}

}

func TestShuffleOnce(t *testing.T) {
t.Parallel()
ctx := context.Background()
Expand Down Expand Up @@ -425,6 +461,10 @@ func TestFastStartup(t *testing.T) {
name: "from file, with filter",
args: []string{"-f", dbF.Path(), "-o", "1", "-e", "artist", "AA"},
},
{
name: "from mpd, group-by artist",
args: []string{"-f", dbF.Path(), "-o", "1", "-g", "artist"},
},
}

for _, test := range tests {
Expand Down
9 changes: 5 additions & 4 deletions t/integration/mpd/mpd.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"context"
"fmt"
"io/ioutil"
"log"
"os"
"os/exec"
"path/filepath"
Expand Down Expand Up @@ -298,17 +297,19 @@ func New(ctx context.Context, opts *Options) (*Instance, error) {
connectBackoff := backoff.WithContext(backoff.NewConstantBackOff(mpdConnectBackoff), connectCtx)

var cli *mpdc.Client
backoff.Retry(func() error {
err = backoff.Retry(func() error {
onceCli, err := mpdc.Dial("unix", root.socket)
if err != nil {
log.Printf("failed to connect to mpd: %v", err)
return err
}
cli = onceCli
return nil
}, connectBackoff)
if err != nil {
return nil, fmt.Errorf("failed to connect to mpd at %s: %v", root.socket, err)
}
if cli == nil {
return nil, fmt.Errorf("failed to connect to mpd at %s", root.socket)
panic("backoff did not return an error. This should not happen.")
}

if err != nil {
Expand Down
21 changes: 20 additions & 1 deletion t/load_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,23 @@ TEST(MPDLoaderTest, WithFilter) {
EXPECT_THAT(chain.Items(), WhenSorted(ContainerEq(want)));
}

TEST(MPDLoaderTest, WithGroup) {
fake::MPD mpd;
mpd.db.push_back(fake::Song("song_a", {{MPD_TAG_ALBUM, "__album__"}}));
mpd.db.push_back(fake::Song("song_b", {{MPD_TAG_ALBUM, "__album__"}}));

std::vector<enum mpd_tag_type> group_by = {MPD_TAG_ARTIST};

ShuffleChain chain;
std::vector<Rule> ruleset;

MPDLoader loader(static_cast<mpd::MPD *>(&mpd), ruleset, group_by);
loader.Load(&chain);

std::vector<std::string> want = {"song_a", "song_b"};
EXPECT_THAT(chain.Pick(), WhenSorted(ContainerEq(want)));
}

std::unique_ptr<std::istream> TestStream(std::vector<std::string> lines) {
return std::make_unique<std::istringstream>(absl::StrJoin(lines, "\n"));
}
Expand Down Expand Up @@ -121,7 +138,9 @@ TEST(FileMPDLoaderTest, Basic) {
});

// step 6. Run! (and validate)
FileMPDLoader loader(static_cast<mpd::MPD *>(&mpd), ruleset, s.get());
std::vector<enum mpd_tag_type> group_by;
FileMPDLoader loader(static_cast<mpd::MPD *>(&mpd), ruleset, group_by,
s.get());
loader.Load(&chain);

std::vector<std::vector<std::string>> want = {{song_a.URI()},
Expand Down

0 comments on commit d5b5049

Please sign in to comment.