Adaptive Framework  0.9.0
All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Modules Pages
afw_adaptor_journal.c
Go to the documentation of this file.
1 // See the 'COPYING' file in the project root for licensing information.
2 /*
3  * Adaptive Framework Adaptor Journal
4  *
5  * Copyright (c) 2010-2023 Clemson University
6  *
7  */
8 
14 #include "afw_internal.h"
15 
16 
17 
18 void
19 afw_adaptor_internal_journal_prologue(
20  const afw_adaptor_session_t *session,
21  const afw_object_t *journal_entry,
22  afw_xctx_t *xctx)
23 {
24  const afw_value_t *now;
25 
26  now = afw_value_create_dateTime_now_utc(journal_entry->p, xctx);
27  afw_object_set_property(journal_entry, &afw_s_beginTime, now, xctx);
28 }
29 
30 
31 void
33  const afw_adaptor_session_t *session,
34  const afw_object_t *journal_entry,
35  afw_boolean_t modification,
36  afw_xctx_t *xctx)
37 {
38  const afw_value_t *now;
39  const afw_adaptor_session_t *journal_session;
40  const afw_adaptor_journal_t *journal;
41  afw_adaptor_impl_request_t impl_request;
42 
43  now = afw_value_create_dateTime_now_utc(journal_entry->p, xctx);
44  afw_object_set_property(journal_entry, &afw_s_endTime, now, xctx);
45 
47  /* If this is a modification, add event to journal if requested. */
48  if (modification && session->adaptor->impl->journal_adaptor_id) {
49  journal_session = afw_adaptor_session_get_cached(
50  session->adaptor->impl->journal_adaptor_id, true, xctx);
51  journal = afw_adaptor_session_get_journal_interface(journal_session,
52  xctx);
53  afw_memory_clear(&impl_request);
54  afw_adaptor_journal_add_entry(journal, &impl_request,
55  journal_entry, xctx);
56  }
57 }
58 
59 
60 
61 /* _AdaptiveJournalEntry update_object(). */
62 AFW_DEFINE(void)
63 afw_adaptor_journal_entry_consume(
64  const afw_adaptor_session_t *session,
65  const afw_utf8_t *object_id,
66  const afw_object_t *update_object,
67  afw_xctx_t *xctx)
68 {
69  const afw_adaptor_journal_t *journal;
70  afw_boolean_t consumed;
71  afw_boolean_t found;
72  const afw_utf8_t *consumer_id;
73  afw_adaptor_impl_request_t impl_request;
74 
75  /* Get journal interface. */
76  journal = afw_adaptor_session_get_journal_interface(session, xctx);
77  if (!journal) {
78  AFW_THROW_ERROR_FZ(general, xctx,
79  "adaptor_id %" AFW_UTF8_FMT
80  " session get_journal() returned NULL",
81  AFW_UTF8_FMT_ARG(&session->adaptor->adaptor_id));
82  }
83 
84  /* Get consumed property from update object. */
85  consumed = afw_object_old_get_property_as_boolean(update_object,
86  &afw_s_consumed, &found, xctx);
87  if (!found || !consumed) {
88  AFW_THROW_ERROR_Z(general,
90  " update_object() must have consumed property set to true", xctx);
91  }
92 
93  /* Get consumer_id from update object. */
94  consumer_id = afw_object_old_get_property_as_string(update_object,
95  &afw_s_consumerId, xctx);
96  if (!consumer_id) {
97  AFW_THROW_ERROR_Z(general,
99  " update_object() must have consumerId property", xctx);
100  }
101 
102  /* Mark entry consumed. */
103  afw_memory_clear(&impl_request);
104  afw_adaptor_journal_mark_entry_consumed(journal, &impl_request,
105  consumer_id, object_id, xctx);
106 }
107 
108 
109 static const afw_adaptor_journal_t *
110 impl_get_journal_interface(const afw_utf8_t *adaptor_id,
111  afw_boolean_t begin_transaction, afw_xctx_t *xctx)
112 {
113  const afw_adaptor_session_t *session;
114  const afw_adaptor_journal_t *journal;
115 
116  /* Get an active session. */
117  session = afw_adaptor_session_get_cached(adaptor_id, begin_transaction, xctx);
118 
119  journal = afw_adaptor_session_get_journal_interface(session, xctx);
120  if (!journal) goto error;
121  return journal;
122 
123 error:
124  AFW_THROW_ERROR_FZ(general, xctx,
125  "Adaptor %" AFW_UTF8_FMT " does not support journal",
126  AFW_UTF8_FMT_ARG(adaptor_id));
127 }
128 
129 
130 /* Journal - get first entry. */
131 AFW_DEFINE(const afw_object_t *)
133  const afw_utf8_t *adaptor_id,
134  const afw_pool_t *p,
135  afw_xctx_t *xctx)
136 {
137  const afw_adaptor_journal_t *journal;
138  const afw_object_t *result;
139  afw_adaptor_impl_request_t impl_request;
140 
141  /* Create memory object for result. */
142  result = afw_object_create_managed(p, xctx);
143 
144  /* Get journal interface. */
145  journal = impl_get_journal_interface(adaptor_id, false, xctx);
146 
149  /* Get first entry. */
150  afw_memory_clear(&impl_request);
151  afw_adaptor_journal_get_entry(journal, &impl_request,
152  afw_adaptor_journal_option_get_first, NULL, NULL, -1, result, xctx);
153 
154  /* Return result object. */
155  return result;
156 }
157 
158 
159 /* Journal - get entry at cursor. */
160 AFW_DEFINE(const afw_object_t *)
162  const afw_utf8_t *adaptor_id,
163  const afw_utf8_t *cursor,
164  const afw_pool_t *p,
165  afw_xctx_t *xctx)
166 {
167  const afw_adaptor_journal_t *journal;
168  const afw_object_t *result;
169  afw_adaptor_impl_request_t impl_request;
170 
171  /* Create memory object for result. */
172  result = afw_object_create_managed(p, xctx);
173 
174  /* Get journal interface. */
175  journal = impl_get_journal_interface(adaptor_id, false, xctx);
176 
177 
180  /* Get first entry. */
181  afw_memory_clear(&impl_request);
182  afw_adaptor_journal_get_entry(journal, &impl_request,
183  afw_adaptor_journal_option_get_by_cursor, NULL, cursor, -1, result, xctx);
184 
185  /* Return result object. */
186  return result;
187 }
188 
189 
190 /* Journal - get next entry after cursor. */
191 AFW_DEFINE(const afw_object_t *)
193  const afw_utf8_t *adaptor_id,
194  const afw_utf8_t *cursor,
195  const afw_pool_t *p,
196  afw_xctx_t *xctx)
197 {
198  const afw_adaptor_journal_t *journal;
199  const afw_object_t *result;
200  afw_adaptor_impl_request_t impl_request;
201 
202  /* Create memory object for result. */
203  result = afw_object_create_managed(p, xctx);
204 
205  /* Get journal interface. */
206  journal = impl_get_journal_interface(adaptor_id, false, xctx);
207 
208 
211  /* Get first entry. */
212  afw_memory_clear(&impl_request);
213  afw_adaptor_journal_get_entry(journal, &impl_request,
214  afw_adaptor_journal_option_get_next_after_cursor, NULL, cursor, -1, result,
215  xctx);
216 
217  /* Return result object. */
218  return result;
219 }
220 
221 
222 /* Journal - get next entry for consumer. */
223 AFW_DEFINE(const afw_object_t *)
225  const afw_utf8_t *adaptor_id,
226  const afw_utf8_t *consumer_id,
227  afw_size_t limit,
228  const afw_pool_t *p,
229  afw_xctx_t *xctx)
230 {
231  const afw_adaptor_journal_t *journal;
232  const afw_object_t *result;
233  afw_adaptor_impl_request_t impl_request;
234 
235  /* Create memory object for result. */
236  result = afw_object_create_managed(p, xctx);
237 
238  /* Get journal interface. */
239  journal = impl_get_journal_interface(adaptor_id, true, xctx);
240 
241 
244  /* Get first entry. */
245  afw_memory_clear(&impl_request);
246  afw_adaptor_journal_get_entry(journal, &impl_request,
248  limit, result, xctx);
249 
250  /* Return result object. */
251  return result;
252 }
253 
254 
255 /* Journal - get next entry after cursor for consumer. */
256 AFW_DEFINE(const afw_object_t *)
258  const afw_utf8_t *adaptor_id,
259  const afw_utf8_t *consumer_id,
260  const afw_utf8_t *cursor,
261  afw_size_t limit,
262  const afw_pool_t *p,
263  afw_xctx_t *xctx)
264 {
265  const afw_adaptor_journal_t *journal;
266  const afw_object_t *result;
267  afw_adaptor_impl_request_t impl_request;
268 
269  /* Create memory object for result. */
270  result = afw_object_create_managed(p, xctx);
271 
272  /* Get journal interface. */
273  journal = impl_get_journal_interface(adaptor_id, true, xctx);
274 
275 
278  /* Get first entry. */
279  afw_memory_clear(&impl_request);
280  afw_adaptor_journal_get_entry(journal, &impl_request,
282  consumer_id, cursor, limit, result, xctx);
283 
284  /* Return result object. */
285  return result;
286 }
287 
288 
289 /* Journal - advance cursor for consumer. */
290 AFW_DEFINE(const afw_object_t *)
292  const afw_utf8_t *adaptor_id,
293  const afw_utf8_t *consumer_id,
294  afw_size_t limit,
295  const afw_pool_t *p,
296  afw_xctx_t *xctx)
297 {
298  const afw_adaptor_journal_t *journal;
299  const afw_object_t *result;
300  afw_adaptor_impl_request_t impl_request;
301 
302  /* Create memory object for result. */
303  result = afw_object_create_managed(p, xctx);
304 
305  /* Get journal interface. */
306  journal = impl_get_journal_interface(adaptor_id, true, xctx);
307 
308 
311  /* Get first entry. */
312  afw_memory_clear(&impl_request);
313  afw_adaptor_journal_get_entry(journal, &impl_request,
315  consumer_id, NULL, limit, result, xctx);
316 
317  /* Return result object. */
318  return result;
319 }
320 
321 /* Journal - mark entry consumed by consumer. */
322 AFW_DEFINE(void)
324  const afw_utf8_t *adaptor_id,
325  const afw_utf8_t *consumer_id,
326  const afw_utf8_t *cursor,
327  const afw_pool_t *p,
328  afw_xctx_t *xctx)
329 {
330  const afw_adaptor_journal_t *journal;
331  afw_adaptor_impl_request_t impl_request;
332 
333  /* Get journal interface. */
334  journal = impl_get_journal_interface(adaptor_id, true, xctx);
335 
338  /* Get first entry. */
339  afw_memory_clear(&impl_request);
340  afw_adaptor_journal_mark_entry_consumed(journal, &impl_request,
341  consumer_id, cursor, xctx);
342 }
343 
344 
345 const afw_object_t *
347  const afw_adaptor_session_t *session,
348  const afw_utf8_t *object_id,
349  const afw_object_t *journal_entry,
350  afw_xctx_t *xctx)
351 {
353  const afw_utf8_t *consumer_id;
354  const afw_utf8_t *entry_cursor;
355  afw_size_t limit;
356  const afw_utf8_octet_t *s;
357  const afw_utf8_octet_t *c;
358  const afw_utf8_z_t *option_z;
359  const afw_utf8_z_t *syntax_z;
360  afw_size_t len;
361  const afw_adaptor_journal_t *journal;
362  const afw_object_t *request;
363  afw_adaptor_impl_request_t impl_request;
364  afw_boolean_t limit_applies;
365 
366  /*
367  * Get request object. Make one if necessary. Additional properties
368  * will be set for the get_object() properties.
369  */
370  request = afw_object_old_get_property_as_object(journal_entry, &afw_s_request,
371  xctx);
372  if (!request) {
373  request = afw_object_create_embedded(
374  journal_entry, &afw_s_request, xctx);
375  }
376 
377  /* Set default limit. */
378  limit_applies = false;
379  limit = 100;
381  consumer_id = NULL;
382  entry_cursor = NULL;
383  journal = afw_adaptor_session_get_journal_interface(session, xctx);
384  if (!journal) return NULL;
387  /* get_first */
388  if (afw_utf8_starts_with_z(object_id, "get_first")) {
389  syntax_z = "get_first";
391  option_z = "get_first";
392  if (strlen("get_first") != object_id->len) goto error_special_id;
393  }
394 
395  /* get_by_cursor:<event_cursor> */
396  else if (afw_utf8_starts_with_z(object_id, "get_by_cursor:")) {
397  syntax_z = "get_by_cursor:<event_cursor>";
399  option_z = "get_by_cursor";
400  entry_cursor = afw_utf8_create(
401  object_id->s + strlen("get_by_cursor:"),
402  object_id->len - strlen("get_by_cursor:"),
403  request->p, xctx);
404  if (entry_cursor->len == 0) goto error_special_id;
405  }
406 
407  /* get_next_after_cursor:<event_cursor> */
408  else if (afw_utf8_starts_with_z(object_id, "get_next_after_cursor:")) {
409  syntax_z = "get_next_after_cursor:<event_cursor>";
411  option_z = "get_next_after_cursor";
412  entry_cursor = afw_utf8_create(
413  object_id->s + strlen("get_next_after_cursor:"),
414  object_id->len - strlen("get_next_after_cursor:"),
415  request->p, xctx);
416  if (entry_cursor->len == 0) goto error_special_id;
417  }
418 
419  /* get_next_for_consumer:<consumer_id> */
420  else if (afw_utf8_starts_with_z(object_id, "get_next_for_consumer:"))
421  {
422  limit_applies = true;
423  syntax_z = "get_next_for_consumer:<consumer_id>";
425  option_z = "get_next_for_consumer";
426  consumer_id = afw_utf8_create(
427  object_id->s + strlen("get_next_for_consumer:"),
428  object_id->len - strlen("get_next_for_consumer:"),
429  request->p, xctx);
430  if (consumer_id->len == 0) goto error_special_id;
431  }
432 
433  /* get_next_for_consumer_after_cursor:<consumer_id>:<event_cursor> */
434  else if (afw_utf8_starts_with_z(object_id,
435  "get_next_for_consumer_after_cursor:"))
436  {
437  limit_applies = true;
438  syntax_z =
439  "get_next_for_consumer_after_cursor:<consumer_id>:<event_cursor>";
441  option_z = "get_next_for_consumer_after_cursor";
442  s = c = object_id->s + strlen("get_next_for_consumer_after_cursor:");
443  len = object_id->len - strlen("get_next_for_consumer_after_cursor:");
444  for (; len > 0; c++, len--) {
445  if (*c == ':') {
446  consumer_id = afw_utf8_create(s, c - s, request->p, xctx);
447  break;
448  }
449  }
450  if (len <= 0) goto error_special_id;
451  entry_cursor = afw_utf8_create(c + 1, len - 1, request->p, xctx);
452  if (entry_cursor->len == 0) goto error_special_id;
453  }
454 
455  /* advance_cursor_for_consumer:<consumer_id> */
456  else if (afw_utf8_starts_with_z(object_id,
457  "advance_cursor_for_consumer:"))
458  {
459  limit_applies = true;
460  syntax_z = "advance_cursor_for_consumer:<consumer_id>";
461  option =
463  option_z = "advance_cursor_for_consumer";
464  consumer_id = afw_utf8_create(
465  object_id->s + strlen("advance_cursor_for_consumer:"),
466  object_id->len - strlen("advance_cursor_for_consumer:"),
467  request->p, xctx);
468  if (consumer_id->len == 0) goto error_special_id;
469  }
470 
471  /* <event_cursor> */
472  else {
474  option_z = "get_by_cursor";
475  entry_cursor = object_id;
476  }
477 
478  /* Set more specific request function. */
479  afw_object_set_property_as_string(request, &afw_s_function,
480  &afw_s_a_journal_get_entry, xctx);
481 
482  /* Set option. */
484  &afw_s_option, option_z, xctx);
485 
486  /* Set entry consumerId property, if applicable. */
487  if (consumer_id) {
488  afw_object_set_property_as_string(request, &afw_s_consumerId,
489  consumer_id, xctx);
490  }
491 
492  /* Set entry consumerId property, if applicable. */
493  if (entry_cursor) {
494  afw_object_set_property_as_string(request, &afw_s_entryCursor,
495  entry_cursor, xctx);
496  }
497 
498  /* Set entry limit property, if applicable. */
499  if (limit_applies) {
500  afw_object_set_property_as_integer(request, &afw_s_limit,
501  limit, xctx);
502  } else {
503  limit = 1;
504  }
505 
506  /* Get entry and return. */
507  afw_memory_clear(&impl_request);
508  afw_adaptor_journal_get_entry(journal, &impl_request,
509  option, consumer_id, entry_cursor, limit, journal_entry, xctx);
510  afw_object_set_property_as_string(journal_entry,
511  &afw_s_status, &afw_s_success, xctx);
512  return journal_entry;
513 
514 error_special_id:
515  AFW_THROW_ERROR_FZ(general, xctx,
516  "Expecting special objectId in the form: %s", syntax_z);
517 }
AFW_DEFINE(const afw_object_t *)
Adaptive Framework Core Internal.
const afw_object_t * afw_adaptor_internal_journal_get_entry(const afw_adaptor_session_t *session, const afw_utf8_t *object_id, const afw_object_t *journal_entry, afw_xctx_t *xctx)
void afw_adaptor_internal_journal_epilogue(const afw_adaptor_session_t *session, const afw_object_t *journal_entry, afw_boolean_t modification, afw_xctx_t *xctx)
#define afw_adaptor_journal_add_entry(instance, impl_request, entry, xctx)
Call method add_entry of interface afw_adaptor_journal.
#define afw_adaptor_journal_mark_entry_consumed(instance, impl_request, consumer_id, entry_cursor, xctx)
Call method mark_entry_consumed of interface afw_adaptor_journal.
#define afw_adaptor_journal_get_entry(instance, impl_request, option, consumer_id, entry_cursor, limit, response, xctx)
Call method get_entry of interface afw_adaptor_journal.
#define afw_adaptor_session_get_journal_interface(instance, xctx)
Call method get_journal_interface of interface afw_adaptor_session.
afw_adaptor_journal_get_by_cursor(const afw_utf8_t *adaptor_id, const afw_utf8_t *cursor, const afw_pool_t *p, afw_xctx_t *xctx)
Journal - get entry at cursor.
afw_adaptor_journal_get_first(const afw_utf8_t *adaptor_id, const afw_pool_t *p, afw_xctx_t *xctx)
Journal - get first entry.
afw_adaptor_journal_get_next_for_consumer_after_cursor(const afw_utf8_t *adaptor_id, const afw_utf8_t *consumer_id, const afw_utf8_t *cursor, afw_size_t limit, const afw_pool_t *p, afw_xctx_t *xctx)
Journal - get next entry after cursor for consumer.
afw_adaptor_journal_mark_consumed(const afw_utf8_t *adaptor_id, const afw_utf8_t *consumer_id, const afw_utf8_t *cursor, const afw_pool_t *p, afw_xctx_t *xctx)
Journal - mark entry consumed by consumer.
afw_adaptor_journal_get_next_for_consumer(const afw_utf8_t *adaptor_id, const afw_utf8_t *consumer_id, afw_size_t limit, const afw_pool_t *p, afw_xctx_t *xctx)
Journal - get next entry for consumer.
afw_adaptor_session_get_cached(const afw_utf8_t *adaptor_id, afw_boolean_t begin_transaction, afw_xctx_t *xctx)
Get/create an active cached session for adaptor_id.
Definition: afw_adaptor.c:375
afw_adaptor_journal_advance_cursor_for_consumer(const afw_utf8_t *adaptor_id, const afw_utf8_t *consumer_id, afw_size_t limit, const afw_pool_t *p, afw_xctx_t *xctx)
Journal - advance cursor for consumer.
afw_adaptor_journal_get_next_after_cursor(const afw_utf8_t *adaptor_id, const afw_utf8_t *cursor, const afw_pool_t *p, afw_xctx_t *xctx)
Journal - get next entry after cursor.
#define afw_object_old_get_property_as_boolean(object, property_name, found, xctx)
Get property function for data type boolean value.
afw_object_set_property_as_integer(const afw_object_t *object, const afw_utf8_t *property_name, afw_integer_t internal, afw_xctx_t *xctx)
Set property function for data type integer values.
#define afw_object_old_get_property_as_object(object, property_name, xctx)
Get property function for data type object value.
#define afw_object_old_get_property_as_string(object, property_name, xctx)
Get property function for data type string value.
afw_object_set_property_as_string(const afw_object_t *object, const afw_utf8_t *property_name, const afw_utf8_t *internal, afw_xctx_t *xctx)
Set property function for data type string values.
#define AFW_UTF8_FMT_ARG(A_STRING)
Convenience Macro for use with AFW_UTF8_FMT to specify arg.
Definition: afw_common.h:605
_Bool afw_boolean_t
Definition: afw_common.h:373
#define AFW_UTF8_FMT
Format string specifier used for afw_utf8_t.
Definition: afw_common.h:588
afw_utf8_octet_t afw_utf8_z_t
NFC normalized UTF-8 null terminated string.
Definition: afw_common.h:523
char afw_utf8_octet_t
8 bits of utf-8 codepoint.
Definition: afw_common.h:236
apr_size_t afw_size_t
size_t.
Definition: afw_common.h:151
enum afw_adaptor_journal_option_e afw_adaptor_journal_option_t
Typedef for afw_adaptor_journal get_entry options enum.
@ afw_adaptor_journal_option_get_next_for_consumer_after_cursor
afw_adaptor_journal get_entry option get_next_for_consumer_after_cursor
Definition: afw_common.h:1118
@ afw_adaptor_journal_option_get_next_for_consumer
afw_adaptor_journal get_entry option get_next_for_consumer
Definition: afw_common.h:1095
@ afw_adaptor_journal_option_advance_cursor_for_consumer
afw_adaptor_journal get_entry option advance_cursor_for_consumer
Definition: afw_common.h:1145
@ afw_adaptor_journal_option_get_next_after_cursor
afw_adaptor_journal get_entry option get_next_after_cursor
Definition: afw_common.h:1060
@ afw_adaptor_journal_option_get_by_cursor
afw_adaptor_journal get_entry option get_by_cursor
Definition: afw_common.h:1049
@ afw_adaptor_journal_option_get_first
afw_adaptor_journal get_entry option get_first
Definition: afw_common.h:1037
#define AFW_THROW_ERROR_FZ(code, xctx, format_z,...)
Macro used to set error and 0 rv in xctx and throw it.
Definition: afw_error.h:319
#define AFW_THROW_ERROR_Z(code, message_z, xctx)
Macro used to set error and 0 rv in xctx and throw it.
Definition: afw_error.h:283
#define afw_memory_clear(to)
Clear preallocated memory for sizeof(*(to)).
Definition: afw_memory.h:47
#define afw_object_create_managed(p, xctx)
Create an empty entity object in its own pool.
Definition: afw_object.h:913
#define AFW_OBJECT_Q_OBJECT_TYPE_ID_JOURNAL_ENTRY
Quoted object type id for Journal Entry object.
Definition: afw_object.h:60
afw_object_set_property_as_string_from_utf8_z(const afw_object_t *instance, const afw_utf8_t *property_name, const afw_utf8_z_t *string_z, afw_xctx_t *xctx)
Set an string property from utf8_z.
Definition: afw_object.c:194
afw_object_set_property(const afw_object_t *instance, const afw_utf8_t *property_name, const afw_value_t *value, afw_xctx_t *xctx)
Set the value of an object's property.
Definition: afw_object.c:46
const afw_object_t * afw_object_create_embedded(const afw_object_t *embedding_object, const afw_utf8_t *property_name, afw_xctx_t *xctx)
Create an empty embedded object in a memory object.
afw_boolean_t afw_utf8_starts_with_z(const afw_utf8_t *string, const afw_utf8_z_t *starts_with_z)
Check to see if a string starts with a utf8_z string.
#define afw_utf8_create(s, len, p, xctx)
Create utf-8 string without copy unless necessary in pool specified.
Definition: afw_utf8.h:239
afw_value_create_dateTime_now_utc(const afw_pool_t *p, afw_xctx_t *xctx)
Create a dateTime value with current time.
Definition: afw_value.c:736
Internal request info used by afw_adaptor_impl*() functions.
const afw_utf8_t * journal_adaptor_id
Journal adaptor id (FIXME Going away)
Interface afw_adaptor_journal public struct.
Interface afw_adaptor_session public struct.
Interface afw_object public struct.
Interface afw_pool public struct.
NFC normalized UTF-8 string.
Definition: afw_common.h:545
Interface afw_value public struct.
Interface afw_xctx public struct.