Skip to content

Commit

Permalink
payment wip
Browse files Browse the repository at this point in the history
  • Loading branch information
riccardobl committed May 27, 2024
1 parent ddb90b9 commit 8ed4940
Show file tree
Hide file tree
Showing 6 changed files with 322 additions and 91 deletions.
111 changes: 90 additions & 21 deletions src/Job.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { Event } from "nostr-tools";

import { Job as _Job, JobInput, Log, JobState, JobStatus, JobResult, JobParam } from "openagents-grpc-proto";
import { Job as _Job, Payment, JobInput, Log, JobState, JobStatus, JobResult, JobParam, Bid, PaymentStatus } from "openagents-grpc-proto";

import Utils from "./Utils";
import {
Expand Down Expand Up @@ -46,14 +46,15 @@ export default class Job implements _Job {
// result: { id: "", content: "", timestamp: 0 },
// acceptedByNode: "",
// };
public results: JobState[]=[];
public results: JobState[] = [];
private maxEventDuration: number;
public maxExecutionTime: number;
public outputFormat: string = "application/json";
public nodeId: string = "";
public encrypted: boolean = false;
public userId: string = "";
private minWorkers: number;
public bid: Bid;

toJSON() {
return {
Expand All @@ -78,6 +79,7 @@ export default class Job implements _Job {
userId: this.userId,
results: this.results,
minWorkers: this.minWorkers,
bid:this.bid
};
}
constructor(
Expand Down Expand Up @@ -139,7 +141,7 @@ export default class Job implements _Job {
this.userId = userId;
}

if(minWorkers)this.minWorkers = minWorkers ;
if (minWorkers) this.minWorkers = minWorkers;
}

merge(event: Event, defaultRelays: Array<string>) {
Expand Down Expand Up @@ -172,8 +174,11 @@ export default class Job implements _Job {
const relays: Array<string> = Utils.getTagVars(event, ["relays"])[0] || defaultRelays;
const expectedOutputFormat: string =
Utils.getTagVars(event, ["output"])[0][0] || "application/json";
// const bid = Utils.getTagVars(event, ["bid"], 1)[0];
// const t = Utils.getTagVars(event, ["t"], 1)[0];

const bid = Utils.getTagVars(event, ["bid"])[0][0];
const t = Utils.getTagVars(event, ["t"])[0][0];
const bidproto="lightning";

const description: string =
Utils.getTagVars(event, ["about"])[0][0] ||
Utils.getTagVars(event, ["param", "description"])[0][0] ||
Expand Down Expand Up @@ -229,15 +234,23 @@ export default class Job implements _Job {
this.relays = [];
this.outputFormat = expectedOutputFormat;
if (minWorkers) this.minWorkers = minWorkers;
// this.results = {};
// this.states = {};
// this.results = {};
// this.states = {};
this.input = inputs;
this.kind = kind;
for (const r of relays) {
if (!this.relays.includes(r)) {
this.relays.push(r);
}
}

// calculate bid for each worker
this.bid={
amount: Number(bid) / minWorkers,
currency: t,
protocol: bidproto
};

} else if (event.kind == 7000 || (event.kind >= 6000 && event.kind <= 6999)) {
const e: Array<string> = Utils.getTagVars(event, ["e"])[0];
const jobId: string = e[0];
Expand Down Expand Up @@ -284,6 +297,7 @@ export default class Job implements _Job {
timestamp: Date.now(),
result: { id: "", content: "", timestamp: 0 },
acceptedByNode: nodeId,
paymentRequests: [],
};
this.results.push(state);
}
Expand Down Expand Up @@ -355,17 +369,17 @@ export default class Job implements _Job {
}
}

isAvailable(nodeId:string) {
isAvailable(nodeId: string) {
if (this.isExpired()) return false;

// if enough successes => not available
let successes = 0;
for (const state of this.results) {
// if success by the same node => not available
if(state.acceptedByNode==nodeId&&state.acceptedBy==this.provider) return false;
if (state.acceptedByNode == nodeId && state.acceptedBy == this.provider) return false;
if (state.status == JobStatus.SUCCESS) {
successes++;
}
}
}
if (successes >= this.minWorkers) return false;

Expand All @@ -378,13 +392,12 @@ export default class Job implements _Job {
Date.now() - state.acceptedAt < this.maxExecutionTime
) {
// if processing by the same node => not available
if(state.acceptedByNode==nodeId&&state.acceptedBy==this.provider) return false;
if (state.acceptedByNode == nodeId && state.acceptedBy == this.provider) return false;
processing++;
}
}
if (processing >= this.minWorkers) return false;


// otherwise available
return true;
}
Expand All @@ -395,12 +408,12 @@ export default class Job implements _Job {

async accept(nodeId: string): Promise<Array<EventTemplate>> {
const t = Date.now();
// add state immediately

// add state immediately
let state = this.results.find((s) => {
return s.acceptedByNode == nodeId&&s.acceptedBy==this.provider;
return s.acceptedByNode == nodeId && s.acceptedBy == this.provider;
});
if(!state){
if (!state) {
state = {
logs: [],
status: JobStatus.PENDING,
Expand All @@ -409,17 +422,16 @@ export default class Job implements _Job {
timestamp: Date.now(),
result: { id: "", content: "", timestamp: 0 },
acceptedByNode: nodeId,
paymentRequests: [],
};
this.results.push(state);
}else{
} else {
state.status = JobStatus.PENDING;
state.acceptedAt = Date.now();
state.timestamp = Date.now();
state.timestamp = Date.now();
}
///



const customerPublicKey = this.customerPublicKey;
let feedbackEvent: EventTemplate = {
kind: 7000,
Expand Down Expand Up @@ -467,7 +479,7 @@ export default class Job implements _Job {
async output(nodeId: string, outputBy: string, data: string): Promise<Array<EventTemplate>> {
// const t = Date.now();
const resultEvent: EventTemplate = {
kind: this.kind+1000,
kind: this.kind + 1000,
content: data,
created_at: Math.floor(Date.now() / 1000),
tags: [
Expand Down Expand Up @@ -505,11 +517,68 @@ export default class Job implements _Job {
].filter((t) => t),
};
events.push(feedbackEvent);

// request payment
if(this.bid){
// create invoice
// const nodeNwc;
const payment: Payment = {
id: "",
amount: this.bid.amount,
currency: this.bid.currency,
protocol: this.bid.protocol,
data: "invoice",
status: PaymentStatus.PAYMENT_PENDING
};


}

// this.state.status = JobStatus.SUCCESS;
// this.state.timestamp = Date.now();
return events;
}

registerPayment(nodeId:string, invoice:string){
const state = this.results.find((s) => {
return s.acceptedByNode == nodeId;
});
if (!state) {
throw new Error("Invalid node");
}
const payment: Payment = {
id: invoice,
amount: this.bid.amount,
currency: this.bid.currency,
protocol: this.bid.protocol,
data: invoice,
status: PaymentStatus.PAYMENT_RECEIVED,
};
state.paymentRequests.push(payment);

if (!state) {
state = {
logs: [],
status: JobStatus.PENDING,
acceptedAt: Date.now(),
acceptedBy: this.provider,
timestamp: Date.now(),
result: { id: "", content: "", timestamp: 0 },
acceptedByNode: nodeId,
paymentRequests: [],
};
this.results.push(state);
} else {
state.status = JobStatus.PENDING;
state.acceptedAt = Date.now();
state.timestamp = Date.now();
}
}

getBid(){
return this.bid;
}

async log(nodeId: string, pk: string, log: string): Promise<Array<EventTemplate>> {
const t = Date.now();
const feedbackEvent: EventTemplate = {
Expand Down
47 changes: 47 additions & 0 deletions src/NWCAdapter.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
import * as GRPC from "@grpc/grpc-js";
import Utils from "./Utils";
import { generateSecretKey, getPublicKey, Event } from "nostr-tools";
import Logger from "./Logger";
import { bytesToHex, hexToBytes } from "@noble/hashes/utils";
import NostrConnector from "./NostrConnector";

export default class NWCAdapter {
protected logger = Logger.get(this.constructor.name);
private conn: NostrConnector;
constructor(conn:NostrConnector) {
this.conn=conn;
}


adaptNodeService(
poolPublicKey: string,
data: [GRPC.ServiceDefinition, GRPC.UntypedServiceImplementation]
): [GRPC.ServiceDefinition, GRPC.UntypedServiceImplementation] {
let [def, impl] = data;

impl = Object.fromEntries(
Object.entries(impl).map(([methodName, methodImplementation]: [any, any]) => [
methodName,
async (call, callback) => {
try {
const metadata = call.metadata.getMap();
const nwc: string = metadata["nwc"] || "";
if (nwc) {
const nwcData = Utils.parseNWC(nwc);
if (nwcData){
call.metadata.set("nwc-data", JSON.stringify(nwcData));
if (nwcData.relay) this.conn.addExtraRelays([nwcData.relay]);
}
}
methodImplementation(call, callback);
} catch (e) {
this.logger.error("Error", e);
throw e;
}
},
])
);

return [def, impl];
}
}
Loading

0 comments on commit 8ed4940

Please sign in to comment.