Skip to content

Commit

Permalink
refactor: dublicate code
Browse files Browse the repository at this point in the history
  • Loading branch information
Angular2Guy committed Dec 31, 2024
1 parent ea40d20 commit 1bfa967
Showing 1 changed file with 21 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,59 +198,62 @@ private String createHourDayAvg() {
private void createCbHourlyAvg() {
LOG.info("createCbHourlyAvg()");
LocalDateTime startAll = LocalDateTime.now();
MyTimeFrame timeFrame = this.serviceUtils.createTimeFrame(CB_HOUR_COL, QuoteCb.class, true);
MyTimeFrame timeFrame = this.serviceUtils.createTimeFrame(CB_HOUR_COL, QuoteCb.class, true);
Calendar now = Calendar.getInstance();
now.setTime(Date.from(LocalDate.now().atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()));
final var timeFrames = this.createTimeFrames(timeFrame, now);
if (this.cpuConstraint) {
timeFrames.stream().forEachOrdered(timeFrame1 -> processHourTimeFrame(timeFrame1));
} else {
timeFrames.stream().forEachOrdered(timeFrame1 -> this.processTimeFrame(timeFrame1, false));
} else {
try (ForkJoinPool customThreadPool = new ForkJoinPool(2)) {
customThreadPool.submit(() -> timeFrames.parallelStream().forEachOrdered(timeFrame1 -> processHourTimeFrame(timeFrame1)));
customThreadPool.submit(() -> timeFrames.parallelStream()
.forEachOrdered(timeFrame1 -> this.processTimeFrame(timeFrame1, false)));
customThreadPool.shutdown();
}
}
}
LOG.info(this.serviceUtils.createAvgLogStatement(startAll, "Prepared Coinbase Hourly Data Time:"));
}

private void processHourTimeFrame(MyTimeFrame timeFrame1) {
private void processTimeFrame(MyTimeFrame timeFrame1, boolean isDay) {
Date start = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("dd.MM.yyyy");
final var nonZeroProperties = new AtomicInteger(0);
Query query = new Query();
query.addCriteria(
Criteria.where(DtoUtils.CREATEDAT).gt(timeFrame1.begin().getTime()).lt(timeFrame1.end().getTime()));
// Coinbase
final var logFailed = String.format("Coinbase prepare %s data failed", isDay ? "day" : "hour");
Mono<Collection<QuoteCb>> collectCb = this.myMongoRepository.find(query, QuoteCb.class)
.timeout(Duration.ofSeconds(5L)).doOnError(ex -> LOG.warn("Coinbase prepare hour data failed", ex))
.timeout(Duration.ofSeconds(10L)).doOnError(ex -> LOG.warn(logFailed, ex))
.onErrorResume(ex -> Mono.empty()).subscribeOn(this.mongoScheduler).collectList()
.map(quotes -> makeCbQuoteHour(quotes, timeFrame1.begin(), timeFrame1.end()));
.map(quotes -> isDay ? this.makeCbQuoteDay(quotes, timeFrame1.begin(), timeFrame1.end())
: this.makeCbQuoteHour(quotes, timeFrame1.begin(), timeFrame1.end()));
collectCb.filter(Predicate.not(Collection::isEmpty))
.map(myColl -> countRelevantProperties(nonZeroProperties, myColl))
.flatMap(myColl -> this.myMongoRepository.insertAll(Mono.just(myColl), CB_HOUR_COL)
.timeout(Duration.ofSeconds(5L))
.doOnError(ex -> LOG.warn("Coinbase prepare hour data failed", ex))
.map(myColl -> this.countRelevantProperties(nonZeroProperties, myColl))
.flatMap(myColl -> this.myMongoRepository.insertAll(Mono.just(myColl), isDay ? CB_DAY_COL : CB_HOUR_COL)
.timeout(Duration.ofSeconds(10L)).doOnError(ex -> LOG.warn(logFailed, ex))
.onErrorResume(ex -> Mono.empty()).subscribeOn(this.mongoScheduler).collectList())
.subscribeOn(this.mongoScheduler).block();
LOG.info("Prepared Coinbase Hour Data for: " + sdf.format(timeFrame1.begin().getTime()) + " Time: "
LOG.info(String.format("Prepared Coinbase %s Data for: ", isDay ? "Day" : "Hour") + sdf.format(timeFrame1.begin().getTime()) + " Time: "
+ (new Date().getTime() - start.getTime()) + "ms" + " 0 < properties: " + nonZeroProperties.get());
}

private void createCbDailyAvg() {
LOG.info("createCbDailyAvg()");
LocalDateTime startAll = LocalDateTime.now();
final MyTimeFrame timeFrame = this.serviceUtils.createTimeFrame(CB_DAY_COL, QuoteCb.class, false);
final MyTimeFrame timeFrame = this.serviceUtils.createTimeFrame(CB_DAY_COL, QuoteCb.class, false);
final Calendar now = Calendar.getInstance();
now.setTime(Date.from(LocalDate.now().atStartOfDay().atZone(ZoneId.systemDefault()).toInstant()));
final var timeFrames = this.createTimeFrames(timeFrame, now);
if (this.cpuConstraint) {
timeFrames.stream().forEachOrdered(timeFrame1 -> processDayTimeFrame(timeFrame1));
timeFrames.stream().forEachOrdered(timeFrame1 -> this.processTimeFrame(timeFrame1, true));
} else {
try (ForkJoinPool customThreadPool = new ForkJoinPool(2)) {
customThreadPool.submit(() -> timeFrames.parallelStream().forEachOrdered(timeFrame1 -> processDayTimeFrame(timeFrame1)));
customThreadPool.submit(() -> timeFrames.parallelStream()
.forEachOrdered(timeFrame1 -> this.processTimeFrame(timeFrame1, true)));
customThreadPool.shutdown();
}
}
}
}
LOG.info(this.serviceUtils.createAvgLogStatement(startAll, "Prepared Coinbase Daily Data Time:"));
}

Expand All @@ -275,29 +278,6 @@ private Calendar nextDay(Calendar begin) {
return begin;
}

private void processDayTimeFrame(MyTimeFrame timeFrame1) {
Date start = new Date();
final SimpleDateFormat sdf = new SimpleDateFormat("dd.MM.yyyy");
final var nonZeroProperties = new AtomicInteger(0);
Query query = new Query();
query.addCriteria(
Criteria.where(DtoUtils.CREATEDAT).gt(timeFrame1.begin().getTime()).lt(timeFrame1.end().getTime()));
// Coinbase
Mono<Collection<QuoteCb>> collectCb = this.myMongoRepository.find(query, QuoteCb.class)
.timeout(Duration.ofSeconds(5L)).doOnError(ex -> LOG.warn("Coinbase prepare day data failed", ex))
.onErrorResume(ex -> Mono.empty()).subscribeOn(this.mongoScheduler).collectList()
.map(quotes -> makeCbQuoteDay(quotes, timeFrame1.begin(), timeFrame1.end()));
collectCb.filter(Predicate.not(Collection::isEmpty))
.map(myColl -> countRelevantProperties(nonZeroProperties, myColl))
.flatMap(myColl -> this.myMongoRepository.insertAll(Mono.just(myColl), CB_DAY_COL)
.timeout(Duration.ofSeconds(5L))
.doOnError(ex -> LOG.warn("Coinbase prepare day data failed", ex))
.onErrorResume(ex -> Mono.empty()).subscribeOn(this.mongoScheduler).collectList())
.subscribeOn(this.mongoScheduler).block();
LOG.info("Prepared Coinbase Day Data for: " + sdf.format(timeFrame1.begin().getTime()) + " Time: "
+ (new Date().getTime() - start.getTime()) + "ms" + " 0 < properties: " + nonZeroProperties.get());
}

private Collection<QuoteCb> countRelevantProperties(final AtomicInteger nonZeroProperties,
Collection<QuoteCb> myColl) {
var relevantProperties = myColl.stream().flatMap(myQuote -> Stream.of(this.propertiesNonZero(myQuote)))
Expand Down

0 comments on commit 1bfa967

Please sign in to comment.