class CounterTopic {
static async build(topicPath, session, topicValue = 0) {
type(arguments, ['string', /.*/, 'number|undefined']);
try {
await session.topicUpdate.set(
topicPath,
diffusion.datatypes.json(),
topicValue,
{
specification: new diffusion.topics.TopicSpecification(
diffusion.topics.TopicType.JSON
),
constraint: diffusion.updateConstraints().noTopic()
}
)
return new CounterTopic(topicPath, session, topicValue);
} catch(err) {
if (err?.reason.startsWith("Constraint NoTopic")) {
const actualValue = await CounterTopic.fetch(session, topicPath);
return new CounterTopic(topicPath, session, actualValue);
} else {
throw err;
}
}
}
constructor(topicPath, session, value) {
type(arguments, ['string', /.*/, 'number']);
this.topicPath = topicPath;
this.session = session;
this.value = value;
}
async increment() {
while (true) {
const patch = [
{ op: "test", path: "", value: this.value },
{ op: "replace", path: "", value: this.value + 1 }
];
const result = await this.session.topicUpdate.applyJsonPatch(
this.topicPath,
patch
);
if (result.failedOperation === undefined) {
this.value++;
return this.value;
}
this.value = await CounterTopic.fetch(this.session, this.topicPath);
}
}
get knownValue() {
return this.value;
}
static async fetch(session, topicPath) {
type(arguments, [/.*/, 'string']);
const fetchResult = await session
.fetchRequest()
.withValues(diffusion.datatypes.json())
.fetch(topicPath);
const topicRows = fetchResult.results();
if (topicRows.length < 1) {
throw Error("Topic not found: " + topicPath);
}
return topicRows[0].value().get();
}
}