Adaptive Framework  0.9.0
All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Modules Pages
afw_lmdb_journal.c
Go to the documentation of this file.
1 // See the 'COPYING' file in the project root for licensing information.
2 /*
3  * Implementation of afw_adaptor_journal interface for LMDB
4  *
5  * Copyright (c) 2010-2023 Clemson University
6  *
7  */
8 
9 
15 #include "afw.h"
16 #include "afw_lmdb.h"
17 #include "afw_lmdb_internal.h"
19 #include "afw_adaptor_impl.h"
20 #include "lmdb.h"
21 
22 /* Declares and rti/inf defines for interface afw_adaptor_journal */
23 #define AFW_IMPLEMENTATION_ID "lmdb"
25 
29  afw_xctx_t *xctx)
30 {
31  afw_lmdb_journal_t *self;
32 
34 
35  self->pub.inf = &impl_afw_adaptor_journal_inf;
36  self->session = (afw_adaptor_session_t *)session;
37 
38  return self;
39 }
40 
41 
42 /*
43  * Implementation of method add_entry of interface afw_adaptor_journal.
44  */
45 const afw_utf8_t *
47  const afw_adaptor_journal_t * instance,
48  const afw_adaptor_impl_request_t * impl_request,
49  const afw_object_t * entry,
50  afw_xctx_t *xctx)
51 {
52  /* Assign instance pointer to self. */
53  afw_lmdb_journal_t * self =
54  (afw_lmdb_journal_t *)instance;
55  afw_lmdb_adaptor_session_t *session =
56  (afw_lmdb_adaptor_session_t *)self->session;
57  afw_lmdb_adaptor_t *adaptor =
58  (afw_lmdb_adaptor_t *)session->adaptor;
59  const afw_utf8_t *token;
60  const afw_memory_t *entry_string;
61  MDB_dbi dbi;
62  MDB_val key, data;
63  MDB_cursor *cursor;
64  apr_uint64_t t;
65  int rc;
66 
67  AFW_LMDB_BEGIN_TRANSACTION(adaptor, session, 0, false, xctx) {
68 
69  /* open the journal database */
70  dbi = afw_lmdb_internal_open_database(adaptor,
71  AFW_LMDB_GET_TRANSACTION(),
72  &afw_lmdb_s_Journal, MDB_CREATE, xctx->p, xctx);
73 
74  /* first get the last entry in the database */
75  cursor = afw_lmdb_internal_open_cursor(session, dbi, xctx);
76  if (cursor == NULL) {
77  /* unable to get a cursor over the journal database */
78  AFW_THROW_ERROR_Z(general,
79  "Error getting cursor for journal database.", xctx);
80  }
81 
82  rc = mdb_cursor_get(cursor, &key, &data, MDB_LAST);
83  if (rc == MDB_NOTFOUND) {
84  /* no entries here */
85  t = 0;
86  key.mv_data = (void *) &t;
87  key.mv_size = sizeof(t);
88  } else if (rc == 0) {
89  t = *((apr_uint64_t *)(key.mv_data));
90  AFW_ENDIAN_BIG_TO_NATIVE_64(&t);
91  }
92 
93  /* increment the key for the next entry */
94  t++;
95  AFW_ENDIAN_NATIVE_TO_BIG_64(&t);
96  key.mv_data = &t;
97 
98  /* store the entry into the next data pointer */
99  entry_string = afw_content_type_object_to_raw(
100  adaptor->ubjson, entry, NULL, xctx->p, xctx);
101  data.mv_data = (void *)entry_string->ptr;
102  data.mv_size = entry_string->size;
103 
104  rc = mdb_cursor_put(cursor, &key, &data, MDB_APPEND);
105  if (rc) {
106  /* error appending key/data to our journal database */
107  AFW_THROW_ERROR_RV_Z(general, lmdb_journal, rc,
108  "Error appending journal entry database.", xctx);
109  }
110 
111  mdb_cursor_close(cursor);
112 
114  }
116 
117  /* return the new integer id */
118  AFW_ENDIAN_BIG_TO_NATIVE_64(&t);
119 
120  token = afw_utf8_printf(xctx->p, xctx, "%lu", t);
121 
122  return token;
123 }
124 
125 const afw_object_t *
126 afw_lmdb_adaptor_journal_get_peer_object(
127  afw_lmdb_journal_t * self,
128  afw_lmdb_adaptor_session_t * session,
129  afw_lmdb_adaptor_t * adaptor,
130  MDB_dbi dbi,
131  MDB_txn * txn,
132  const afw_uuid_t *uuid,
133  afw_xctx_t *xctx)
134 {
135  const afw_object_t *object = NULL;
136  const afw_value_t *value;
137  afw_memory_t raw;
138  MDB_val key, data;
139 
140  memset(&data, 0, sizeof(MDB_val));
141  key.mv_data = (void*)uuid;
142  key.mv_size = sizeof(afw_uuid_t);
143 
144  if (mdb_get(txn, dbi, &key, &data) == 0) {
145  raw.size = data.mv_size;
146  raw.ptr = data.mv_data;
147 
148  value = afw_content_type_raw_to_value(adaptor->ubjson, &raw, NULL,
149  xctx->p, xctx);
150 
151  if (value && afw_value_is_object(value)) {
152  object = afw_value_as_object(value, xctx);
153  }
154  } else {
155  /* no entry found */
156  }
157 
158  /* return a copy of the object that we may need to modify */
159  return object;
160 }
161 
162 void
163 afw_lmdb_journal_update_peer(
164  afw_lmdb_journal_t * self,
165  afw_lmdb_adaptor_session_t * session,
166  afw_lmdb_adaptor_t *adaptor,
167  MDB_dbi dbi,
168  const afw_uuid_t *uuid,
169  const afw_object_t *updated_object,
170  afw_xctx_t *xctx)
171 {
172  const afw_utf8_t *object_id;
173 
174  object_id = afw_uuid_to_utf8(uuid, xctx->p, xctx);
175 
176  afw_lmdb_internal_replace_entry_from_object(session,
177  &afw_s__AdaptiveJournalEntry_, object_id, updated_object,
178  dbi, xctx);
179 }
180 
181 const afw_object_t *
182 afw_lmdb_adaptor_journal_get_entry_object(
183  afw_lmdb_journal_t * self,
184  afw_lmdb_adaptor_session_t * session,
185  afw_lmdb_adaptor_t * adaptor,
186  MDB_dbi dbi,
187  MDB_txn * txn,
188  apr_uint64_t cursor,
189  afw_xctx_t *xctx)
190 {
191  const afw_object_t *object = NULL;
192  const afw_value_t *value;
193  afw_memory_t raw;
194  MDB_val key, data;
195 
196  /* The journal keys are in Big Endian for portability */
197  AFW_ENDIAN_NATIVE_TO_BIG_64(&cursor);
198 
199  memset(&data, 0, sizeof(MDB_val));
200  key.mv_data = (void *)&cursor;
201  key.mv_size = sizeof(cursor);
202 
203  if (mdb_get(txn, dbi, &key, &data) == 0) {
204  raw.size = data.mv_size;
205  raw.ptr = data.mv_data;
206 
207  value = afw_content_type_raw_to_value(adaptor->ubjson, &raw, NULL,
208  xctx->p, xctx);
209 
210  if (value && afw_value_is_object(value)) {
211  object = afw_value_as_object(value, xctx);
212  }
213  } else {
214  /* no entry found */
215  }
216 
217  /* return a copy of the object that we may need to modify */
218  return object;
219 }
220 
221 void
222 afw_lmdb_journal_get_first(
223  afw_lmdb_journal_t * self,
225  afw_lmdb_adaptor_t * adaptor,
226  MDB_dbi dbiJournal,
227  MDB_txn * txn,
228  const afw_object_t * response,
229  afw_xctx_t *xctx)
230 {
231  const afw_object_t *entry;
232  apr_uint64_t cursor;
233 
234  /* each entry is represented numerically */
235  cursor = 1;
236 
237  entry = afw_lmdb_adaptor_journal_get_entry_object(
238  self, session, adaptor, dbiJournal, txn, cursor, xctx);
239  if (entry) {
240  afw_object_set_property_as_string(response, &afw_s_entryCursor,
241  afw_utf8_printf(xctx->p, xctx, "%lu", cursor), xctx);
242 
243  /* Set entry property in response. */
244  afw_object_set_property_as_object(response, &afw_s_entry, entry,
245  xctx);
246  }
247 }
248 
249 void
250 afw_lmdb_journal_get_by_cursor(
251  afw_lmdb_journal_t * self,
253  afw_lmdb_adaptor_t * adaptor,
254  MDB_dbi dbiJournal,
255  MDB_txn * txn,
256  const afw_utf8_t * entry_cursor,
257  const afw_object_t * response,
258  afw_xctx_t *xctx)
259 {
260  const afw_object_t *entry;
261  apr_uint64_t cursor;
262 
263  cursor = apr_strtoi64(
264  afw_utf8_to_utf8_z(entry_cursor, xctx->p, xctx), NULL, 10);
265 
266  entry = afw_lmdb_adaptor_journal_get_entry_object(
267  self, session, adaptor, dbiJournal, txn, cursor, xctx);
268  if (entry) {
269  afw_object_set_property_as_string(response, &afw_s_entryCursor,
270  afw_utf8_printf(xctx->p, xctx, "%lu", cursor), xctx);
271 
272  /* Set entry property in response. */
273  afw_object_set_property_as_object(response, &afw_s_entry, entry,
274  xctx);
275  }
276 }
277 
278 void
279 afw_lmdb_journal_get_next_after_cursor(
280  afw_lmdb_journal_t * self,
282  afw_lmdb_adaptor_t * adaptor,
283  MDB_dbi dbiJournal,
284  MDB_txn * txn,
285  const afw_utf8_t * entry_cursor,
286  afw_size_t limit,
287  const afw_object_t * response,
288  afw_xctx_t *xctx)
289 {
290  const afw_object_t *entry;
291  apr_uint64_t cursor;
292 
293  /* set our cursor to one after the entry_cursor */
294  cursor = apr_strtoi64(
295  afw_utf8_to_utf8_z(entry_cursor, xctx->p, xctx), NULL, 10) + 1;
296 
297  entry = afw_lmdb_adaptor_journal_get_entry_object(
298  self, session, adaptor, dbiJournal, txn, cursor, xctx);
299  if (entry) {
300  afw_object_set_property_as_string(response, &afw_s_entryCursor,
301  afw_utf8_printf(xctx->p, xctx, "%lu", cursor), xctx);
302 
303  /* Set entry property in response. */
304  afw_object_set_property_as_object(response, &afw_s_entry, entry,
305  xctx);
306  }
307 }
308 
309 void
311  afw_lmdb_journal_t * self,
313  afw_lmdb_adaptor_t * adaptor,
314  MDB_dbi dbiJournal,
315  MDB_txn * txn,
316  const afw_utf8_t * consumer_id,
317  const afw_utf8_t * entry_cursor,
318  afw_size_t limit,
319  const afw_object_t * response,
320  afw_xctx_t *xctx)
321 {
322  const afw_object_t *entry = NULL;
323  MDB_dbi dbiConsumers;
324  apr_uint64_t cursor;
325  const afw_uuid_t *uuid;
326  const afw_object_t *peer;
327  const afw_utf8_t *advance_cursor;
328  const afw_utf8_t *cursor_str;
329  const afw_value_t *consumer_filter;
330  afw_boolean_t found = AFW_FALSE;
331  const afw_dateTime_t *now;
332  afw_size_t i;
333 
334  /* In this routine, we can simply start with entry_cursor
335  and work our way up until we find the next entry */
336  cursor = apr_strtoi64(
337  afw_utf8_to_utf8_z(entry_cursor, xctx->p, xctx), NULL, 10);
338 
339  dbiConsumers = afw_lmdb_internal_open_database(session->adaptor,
340  txn, &afw_lmdb_s_Primary, 0, xctx->p, xctx);
341 
342  /* lookup the cursor from the consumer database */
343  uuid = afw_uuid_from_utf8(consumer_id, xctx->p, xctx);
344 
345  peer = afw_lmdb_adaptor_journal_get_peer_object(
346  self, session, adaptor, dbiConsumers, txn, uuid, xctx);
347  if (peer == NULL) {
348  AFW_THROW_ERROR_Z(general,
349  "Error, provisioning peer not found.", xctx);
350  }
351 
352  advance_cursor = afw_object_old_get_property_as_utf8(
353  peer, &afw_s_advanceCursor, xctx->p, xctx);
354  consumer_filter = afw_object_get_property(
355  peer, &afw_s_consumeFilter, xctx);
356 
357  /* Scan our journal until we find an applicable entry, or hit our limit */
358  for (i = 0; (i < limit) || !found; i++) {
359  entry = afw_lmdb_adaptor_journal_get_entry_object(self,
360  session, adaptor, dbiJournal, txn, cursor, xctx);
361 
362  if (entry) {
363  /* Now, determine if it's applicable */
365  (const afw_adaptor_journal_t *)self,
366  entry, peer, &consumer_filter, xctx)) {
367  found = AFW_TRUE;
368  } else cursor++;
369 
370  } else {
371  /* No more entries left, so we're done */
372  break;
373  }
374  }
375 
376  /* make a string out of the cursor */
377  cursor_str = afw_utf8_printf(xctx->p, xctx, "%lu", cursor);
378 
379  /* update our last contact time */
380  now = afw_dateTime_now_utc(xctx->p, xctx);
381 
382  if (found) {
383  /* set our consumption properties */
385  &afw_s_consumeStartTime, now, xctx);
387  &afw_s_consumeCursor, cursor_str, xctx);
389  &afw_s_currentCursor, cursor_str, xctx);
391  &afw_s_advanceCursor, xctx);
392 
393  /* set our entry cursor */
395  &afw_s_entryCursor, cursor_str, xctx);
396 
397  /* set the entry to be returned */
399  &afw_s_entry, entry, xctx);
400  } else {
401  /* we may need to increase the advanceCursor */
402  if (advance_cursor) {
405  } else {
407  &afw_s_advanceCursor, cursor_str, xctx);
408  }
409  }
410 
412  &afw_s_lastContactTime, now, xctx);
413 
414  /* update the peer object */
415  afw_lmdb_journal_update_peer(self, session, adaptor,
416  dbiConsumers, uuid, peer, xctx);
417 }
418 
419 void
421  afw_lmdb_journal_t * self,
423  afw_lmdb_adaptor_t *adaptor,
424  MDB_dbi dbiJournal,
425  MDB_txn * txn,
426  const afw_utf8_t * consumer_id,
427  afw_size_t limit,
428  const afw_object_t * response,
429  afw_xctx_t *xctx)
430 {
431  const afw_object_t *entry = NULL;
432  apr_uint64_t cursor;
433  MDB_dbi dbiConsumers;
434  const afw_uuid_t *uuid;
435  const afw_object_t *peer;
436  const afw_utf8_t *current_cursor;
437  const afw_utf8_t *advance_cursor;
438  const afw_utf8_t *consume_cursor;
439  const afw_utf8_t *cursor_str;
440  const afw_value_t *consumer_filter;
441  afw_boolean_t found = AFW_FALSE;
442  const afw_dateTime_t *now;
443  afw_size_t i;
444 
445  dbiConsumers = afw_lmdb_internal_open_database(session->adaptor,
446  txn, &afw_lmdb_s_Primary, 0, xctx->p, xctx);
447 
448  /* lookup the cursor from the consumer database */
449  uuid = afw_uuid_from_utf8(consumer_id, xctx->p, xctx);
450 
451  peer = afw_lmdb_adaptor_journal_get_peer_object(
452  self, session, adaptor, dbiConsumers, txn, uuid, xctx);
453  if (peer == NULL) {
454  AFW_THROW_ERROR_Z(general,
455  "Error, provisioning peer not found.", xctx);
456  }
457 
458  current_cursor = afw_object_old_get_property_as_utf8(
459  peer, &afw_s_currentCursor, xctx->p, xctx);
460  advance_cursor = afw_object_old_get_property_as_utf8(
461  peer, &afw_s_advanceCursor, xctx->p, xctx);
462  consumer_filter = afw_object_get_property(
463  peer, &afw_s_consumeFilter, xctx);
464  consume_cursor = afw_object_old_get_property_as_string(
465  peer, &afw_s_consumeCursor, xctx);
466 
471  if (consume_cursor)
472  cursor = apr_strtoi64(
473  afw_utf8_to_utf8_z(consume_cursor, xctx->p, xctx), NULL, 10);
474  else if (advance_cursor)
475  cursor = apr_strtoi64(
476  afw_utf8_to_utf8_z(advance_cursor, xctx->p, xctx), NULL, 10);
477  else if (current_cursor)
478  cursor = apr_strtoi64(
479  afw_utf8_to_utf8_z(current_cursor, xctx->p, xctx), NULL, 10) + 1;
480  else /* start at the beginning */
481  cursor = 1;
482 
483  /* Scan our journal until we find an applicable entry, or hit our limit */
484  for (i = 0; (i < limit) || !found; i++) {
485  entry = afw_lmdb_adaptor_journal_get_entry_object(self,
486  session, adaptor, dbiJournal, txn, cursor, xctx);
487 
488  if (entry) {
489  /* Now, determine if it's applicable */
491  (const afw_adaptor_journal_t *)self,
492  entry, peer, &consumer_filter, xctx)) {
493  found = AFW_TRUE;
494  } else cursor++;
495 
496  } else {
497  /* No more entries left, so we're done */
498  break;
499  }
500  }
501 
502  /* make a string out of the cursor */
503  cursor_str = afw_utf8_printf(xctx->p, xctx, "%lu", cursor);
504 
505  /* update our last contact time */
506  now = afw_dateTime_now_utc(xctx->p, xctx);
507 
508  if (found) {
509  /* check to see if this is a re-issue */
510  if (consume_cursor) {
511  afw_object_set_property_as_boolean(response, &afw_s_reissue,
512  true, xctx);
513  } else {
514  /* not a re-issue, so set our consumption properties */
516  &afw_s_consumeStartTime, now, xctx);
518  &afw_s_consumeCursor, cursor_str, xctx);
520  &afw_s_currentCursor, cursor_str, xctx);
522  &afw_s_advanceCursor, xctx);
523  }
524 
525  /* set our entry cursor */
527  &afw_s_entryCursor, cursor_str, xctx);
528 
529  /* set the entry to be returned */
531  &afw_s_entry, entry, xctx);
532  } else {
533  /* we may need to increase the advanceCursor */
534  if (advance_cursor) {
537  } else {
539  &afw_s_advanceCursor, cursor_str, xctx);
540  }
541  }
542 
544  &afw_s_lastContactTime, now, xctx);
545 
546  /* update the peer object */
547  afw_lmdb_journal_update_peer(self, session, adaptor,
548  dbiConsumers, uuid, peer, xctx);
549 }
550 
551 void
553  afw_lmdb_journal_t * self,
554  afw_lmdb_adaptor_session_t * session,
555  afw_lmdb_adaptor_t * adaptor,
556  MDB_dbi dbiJournal,
557  MDB_txn * txn,
558  const afw_utf8_t * consumer_id,
559  afw_size_t limit,
560  const afw_object_t * response,
561  afw_xctx_t *xctx)
562 {
563  const afw_object_t *entry;
564  apr_uint64_t cursor;
565  MDB_dbi dbiConsumers;
566  const afw_uuid_t *uuid;
567  const afw_object_t *peer;
568  const afw_utf8_t *current_cursor;
569  const afw_utf8_t *advance_cursor;
570  const afw_utf8_t *consume_cursor;
571  const afw_utf8_t *cursor_str;
572  const afw_value_t *consumer_filter;
573  afw_boolean_t found = AFW_FALSE;
574  const afw_dateTime_t *now;
575  afw_size_t i;
576 
577  dbiConsumers = afw_lmdb_internal_open_database(session->adaptor,
578  txn, &afw_lmdb_s_Primary, 0, xctx->p, xctx);
579 
580  /* lookup the cursor from the consumer database */
581  uuid = afw_uuid_from_utf8(consumer_id, xctx->p, xctx);
582 
583  peer = afw_lmdb_adaptor_journal_get_peer_object(
584  self, session, adaptor, dbiConsumers, txn, uuid, xctx);
585  if (peer == NULL) {
586  AFW_THROW_ERROR_Z(general,
587  "Error, provisioning peer not found.", xctx);
588  }
589 
590  current_cursor = afw_object_old_get_property_as_utf8(
591  peer, &afw_s_currentCursor, xctx->p, xctx);
592  advance_cursor = afw_object_old_get_property_as_utf8(
593  peer, &afw_s_advanceCursor, xctx->p, xctx);
594  consumer_filter = afw_object_get_property(
595  peer, &afw_s_consumeFilter, xctx);
596  consume_cursor = afw_object_old_get_property_as_string(
597  peer, &afw_s_consumeCursor, xctx);
598 
603  if (consume_cursor)
604  cursor = apr_strtoi64(
605  afw_utf8_to_utf8_z(consume_cursor, xctx->p, xctx), NULL, 10);
606  else if (advance_cursor)
607  cursor = apr_strtoi64(
608  afw_utf8_to_utf8_z(advance_cursor, xctx->p, xctx), NULL, 10);
609  else if (current_cursor)
610  cursor = apr_strtoi64(
611  afw_utf8_to_utf8_z(current_cursor, xctx->p, xctx), NULL, 10) + 1;
612  else /* start at the beginning */
613  cursor = 1;
614 
615  /* Scan our journal until we find an applicable entry, or hit our limit */
616  for (i = 0; (i < limit) || !found; i++) {
617  entry = afw_lmdb_adaptor_journal_get_entry_object(self,
618  session, adaptor, dbiJournal, txn, cursor, xctx);
619 
620  if (entry) {
621  /* Now, determine if it's applicable */
623  (const afw_adaptor_journal_t *)self,
624  entry, peer, &consumer_filter, xctx)) {
625  found = AFW_TRUE;
626  } else cursor++;
627 
628  } else {
629  /* No more entries left, so we're done */
630  break;
631  }
632  }
633 
634  /* make a string out of the cursor */
635  cursor_str = afw_utf8_printf(xctx->p, xctx, "%lu", cursor);
636 
637  /* update our last contact time */
638  now = afw_dateTime_now_utc(xctx->p, xctx);
639 
640  if (found) {
641  /* check to see if this is a re-issue */
642  if (consume_cursor) {
643  afw_object_set_property_as_boolean(response, &afw_s_reissue,
644  true, xctx);
645  } else {
646  /* not a re-issue, so set our consumption properties */
648  &afw_s_consumeStartTime, now, xctx);
650  &afw_s_consumeCursor, cursor_str, xctx);
652  &afw_s_currentCursor, cursor_str, xctx);
654  &afw_s_advanceCursor, xctx);
655  }
656 
657  /* set our entry cursor */
659  &afw_s_entryCursor, cursor_str, xctx);
660  } else {
661  /* we may need to increase the advanceCursor */
662  if (advance_cursor) {
665  } else {
667  &afw_s_advanceCursor, cursor_str, xctx);
668  }
669  }
670 
672  &afw_s_lastContactTime, now, xctx);
673 
674  /* update the peer object */
675  afw_lmdb_journal_update_peer(self, session, adaptor,
676  dbiConsumers, uuid, peer, xctx);
677 }
678 
679 
680 /*
681  * Implementation of method get_entry of interface afw_adaptor_journal.
682  */
683 void
685  const afw_adaptor_journal_t * instance,
686  const afw_adaptor_impl_request_t * impl_request,
688  const afw_utf8_t * consumer_id,
689  const afw_utf8_t * entry_cursor,
690  afw_size_t limit,
691  const afw_object_t * response,
692  afw_xctx_t *xctx)
693 {
694  afw_lmdb_journal_t * self =
695  (afw_lmdb_journal_t *)instance;
696  afw_lmdb_adaptor_session_t *session =
697  (afw_lmdb_adaptor_session_t *)self->session;
698  afw_lmdb_adaptor_t *adaptor =
699  (afw_lmdb_adaptor_t *)session->adaptor;
700  MDB_dbi dbiJournal;
701  MDB_txn *txn;
702 
703  AFW_LMDB_BEGIN_TRANSACTION(adaptor, session, 0, false, xctx) {
704 
705  txn = AFW_LMDB_GET_TRANSACTION();
706 
707  /* Open up our Journal database, for every call option */
708  dbiJournal = afw_lmdb_internal_open_database(adaptor,
709  txn, &afw_lmdb_s_Journal, 0, xctx->p, xctx);
710 
711  switch (option) {
713  afw_lmdb_journal_get_first(self,
714  session, adaptor, dbiJournal, txn, response, xctx);
715  break;
717  afw_lmdb_journal_get_by_cursor(self, session,
718  adaptor, dbiJournal, txn, entry_cursor, response, xctx);
719  break;
721  afw_lmdb_journal_get_next_after_cursor(self, session,
722  adaptor, dbiJournal, txn, entry_cursor, limit, response, xctx);
723  break;
726  self, session, adaptor, dbiJournal, txn, consumer_id,
727  limit, response, xctx);
728  break;
731  self, session, adaptor, dbiJournal, txn, consumer_id,
732  entry_cursor, limit, response, xctx);
733  break;
736  self, session, adaptor, dbiJournal, txn, consumer_id,
737  limit, response, xctx);
738  break;
739  }
740 
741  /* commit our transaction */
743  }
745 }
746 
747 
748 
749 /*
750  * Implementation of method mark_entry_consumed of interface afw_adaptor_journal.
751  */
752 void
754  const afw_adaptor_journal_t * instance,
755  const afw_adaptor_impl_request_t * impl_request,
756  const afw_utf8_t * consumer_id,
757  const afw_utf8_t * entry_cursor,
758  afw_xctx_t *xctx)
759 {
760  afw_lmdb_journal_t * self =
761  (afw_lmdb_journal_t *)instance;
762  afw_lmdb_adaptor_session_t *session =
763  (afw_lmdb_adaptor_session_t *)self->session;
764  afw_lmdb_adaptor_t *adaptor =
765  (afw_lmdb_adaptor_t *)session->adaptor;
766  const afw_object_t *peer;
767  const afw_utf8_t *consume_cursor;
768  const afw_uuid_t *uuid;
769  MDB_dbi dbiConsumers;
770  MDB_txn * txn;
771  const afw_dateTime_t *now;
772 
773  AFW_LMDB_BEGIN_TRANSACTION(adaptor, session, 0, false, xctx) {
774 
775  txn = AFW_LMDB_GET_TRANSACTION();
776 
777  dbiConsumers = afw_lmdb_internal_open_database(adaptor,
778  txn, &afw_lmdb_s_Primary, 0, xctx->p, xctx);
779 
780  /* lookup the cursor from the database */
781  uuid = afw_uuid_from_utf8(consumer_id, xctx->p, xctx);
782 
783  peer = afw_lmdb_adaptor_journal_get_peer_object(
784  self, session, adaptor, dbiConsumers, txn, uuid, xctx);
785 
786  consume_cursor = afw_object_old_get_property_as_string(peer,
787  &afw_s_consumeCursor, xctx);
788  if (!consume_cursor || !afw_utf8_equal(entry_cursor, consume_cursor)) {
789  AFW_THROW_ERROR_Z(general,
790  "Object id supplied is not currently being consumed", xctx);
791  }
792 
793  /* now, clear consumerStart/consumeCursor */
794  afw_object_remove_property(peer, &afw_s_consumeStartTime, xctx);
795  afw_object_remove_property(peer, &afw_s_consumeCursor, xctx);
796 
797  /* Update lastContactTime property */
798  now = afw_dateTime_now_utc(xctx->p, xctx);
799  afw_object_set_property_as_dateTime(peer, &afw_s_lastContactTime,
800  now, xctx);
801 
802  /* write out the object and commit */
803  afw_lmdb_journal_update_peer(self, session, adaptor,
804  dbiConsumers, uuid, peer, xctx);
805 
807  }
809 }
Adaptive Framework Core API.
Helpers for adaptor implementation development.
Interface afw_interface implementation declares.
Adaptive Framework LMDB Adaptor.
Adaptive Framework register generated (afw_lmdb) header.
Adaptive Framework LMDB Adaptor Internal Header.
#define AFW_LMDB_END_TRANSACTION()
End an LMDB transaction.
#define AFW_LMDB_BEGIN_TRANSACTION(adaptor, session, flags, exclusive, xctx)
Begin an LMDB transaction.
#define AFW_LMDB_COMMIT_TRANSACTION()
Commit a transaction.
void afw_lmdb_journal_get_next_for_consumer_after_cursor(afw_lmdb_journal_t *self, afw_lmdb_adaptor_session_t *session, afw_lmdb_adaptor_t *adaptor, MDB_dbi dbiJournal, MDB_txn *txn, const afw_utf8_t *consumer_id, const afw_utf8_t *entry_cursor, afw_size_t limit, const afw_object_t *response, afw_xctx_t *xctx)
void afw_lmdb_journal_advance_cursor_for_consumer(afw_lmdb_journal_t *self, afw_lmdb_adaptor_session_t *session, afw_lmdb_adaptor_t *adaptor, MDB_dbi dbiJournal, MDB_txn *txn, const afw_utf8_t *consumer_id, afw_size_t limit, const afw_object_t *response, afw_xctx_t *xctx)
afw_lmdb_journal_t * afw_lmdb_journal_create(afw_lmdb_adaptor_session_t *session, afw_xctx_t *xctx)
Internal create a LMDB adaptor journal.
void impl_afw_adaptor_journal_get_next_for_consumer(afw_lmdb_journal_t *self, afw_lmdb_adaptor_session_t *session, afw_lmdb_adaptor_t *adaptor, MDB_dbi dbiJournal, MDB_txn *txn, const afw_utf8_t *consumer_id, afw_size_t limit, const afw_object_t *response, afw_xctx_t *xctx)
afw_adaptor_impl_is_journal_entry_applicable(const afw_adaptor_journal_t *instance, const afw_object_t *entry, const afw_object_t *consumer, const afw_value_t *const *filter, afw_xctx_t *xctx)
Determine whether a journal entry is applicable to a consumer.
void impl_afw_adaptor_journal_get_entry(const afw_adaptor_journal_t *instance, const afw_adaptor_impl_request_t *impl_request, afw_adaptor_journal_option_t option, const afw_utf8_t *consumer_id, const afw_utf8_t *entry_cursor, afw_size_t limit, const afw_object_t *response, afw_xctx_t *xctx)
const afw_utf8_t * impl_afw_adaptor_journal_add_entry(const afw_adaptor_journal_t *instance, const afw_adaptor_impl_request_t *impl_request, const afw_object_t *entry, afw_xctx_t *xctx)
void impl_afw_adaptor_journal_mark_entry_consumed(const afw_adaptor_journal_t *instance, const afw_adaptor_impl_request_t *impl_request, const afw_utf8_t *consumer_id, const afw_utf8_t *entry_cursor, afw_xctx_t *xctx)
afw_object_set_property_as_boolean(const afw_object_t *object, const afw_utf8_t *property_name, afw_boolean_t internal, afw_xctx_t *xctx)
Set property function for data type boolean values.
afw_object_set_property_as_dateTime(const afw_object_t *object, const afw_utf8_t *property_name, const afw_dateTime_t *internal, afw_xctx_t *xctx)
Set property function for data type dateTime values.
#define afw_value_is_object(A_VALUE)
Macro to determine if value is evaluated object.
afw_object_set_property_as_object(const afw_object_t *object, const afw_utf8_t *property_name, const afw_object_t *internal, afw_xctx_t *xctx)
Set property function for data type object values.
afw_value_as_object(const afw_value_t *value, afw_xctx_t *xctx)
Typesafe cast of data type object.
#define afw_object_old_get_property_as_string(object, property_name, xctx)
Get property function for data type string value.
afw_object_set_property_as_string(const afw_object_t *object, const afw_utf8_t *property_name, const afw_utf8_t *internal, afw_xctx_t *xctx)
Set property function for data type string values.
#define AFW_FALSE
Definition: afw_common.h:392
#define AFW_TRUE
Definition: afw_common.h:383
_Bool afw_boolean_t
Definition: afw_common.h:373
apr_size_t afw_size_t
size_t.
Definition: afw_common.h:151
enum afw_adaptor_journal_option_e afw_adaptor_journal_option_t
Typedef for afw_adaptor_journal get_entry options enum.
@ afw_adaptor_journal_option_get_next_for_consumer_after_cursor
afw_adaptor_journal get_entry option get_next_for_consumer_after_cursor
Definition: afw_common.h:1118
@ afw_adaptor_journal_option_get_next_for_consumer
afw_adaptor_journal get_entry option get_next_for_consumer
Definition: afw_common.h:1095
@ afw_adaptor_journal_option_advance_cursor_for_consumer
afw_adaptor_journal get_entry option advance_cursor_for_consumer
Definition: afw_common.h:1145
@ afw_adaptor_journal_option_get_next_after_cursor
afw_adaptor_journal get_entry option get_next_after_cursor
Definition: afw_common.h:1060
@ afw_adaptor_journal_option_get_by_cursor
afw_adaptor_journal get_entry option get_by_cursor
Definition: afw_common.h:1049
@ afw_adaptor_journal_option_get_first
afw_adaptor_journal get_entry option get_first
Definition: afw_common.h:1037
#define afw_content_type_raw_to_value(instance, raw, source_location, p, xctx)
Call method raw_to_value of interface afw_content_type.
#define afw_content_type_object_to_raw(instance, object, options, p, xctx)
Convert object to the raw in specified pool.
#define AFW_THROW_ERROR_RV_Z(code, rv_source_id, rv, message_z, xctx)
Macro used to set error and rv in xctx and throw it.
Definition: afw_error.h:301
#define AFW_THROW_ERROR_Z(code, message_z, xctx)
Macro used to set error and 0 rv in xctx and throw it.
Definition: afw_error.h:283
#define afw_object_get_property(instance, property_name, xctx)
Call method get_property of interface afw_object.
afw_object_remove_property(const afw_object_t *instance, const afw_utf8_t *property_name, afw_xctx_t *xctx)
Remove a property from object.
Definition: afw_object.c:35
afw_object_old_get_property_as_utf8(const afw_object_t *instance, const afw_utf8_t *property_name, const afw_pool_t *p, afw_xctx_t *xctx)
Get an object's property value as a string in specified pool.
Definition: afw_object.c:531
afw_dateTime_now_utc(const afw_pool_t *p, afw_xctx_t *xctx)
Get now time as dateTime in specified pool.
Definition: afw_time.c:593
afw_boolean_t afw_utf8_equal(const afw_utf8_t *s1, const afw_utf8_t *s2)
Check to see if a string equals another string.
const afw_utf8_z_t * afw_utf8_to_utf8_z(const afw_utf8_t *string, const afw_pool_t *p, afw_xctx_t *xctx)
Convert utf8 to utf8_z in specified pool.
Definition: afw_utf8.h:529
afw_utf8_printf(const afw_pool_t *p, afw_xctx_t *xctx, const afw_utf8_z_t *format,...)
Create a utf-8 string using a c format string in specified pool.
Definition: afw_utf8.c:459
afw_uuid_from_utf8(const afw_utf8_t *s, const afw_pool_t *p, afw_xctx_t *xctx)
Convert standard format UUID utf-8 string to uuid.
Definition: afw_uuid.c:117
afw_uuid_to_utf8(const afw_uuid_t *uuid, const afw_pool_t *p, afw_xctx_t *xctx)
Convert uuid to a standard format UUID utf-8 string.
Definition: afw_uuid.c:79
#define afw_xctx_calloc_type(type, xctx)
Macro to allocate cleared memory to hold type in xctx's pool.
Definition: afw_xctx.h:199
Internal request info used by afw_adaptor_impl*() functions.
Interface afw_adaptor_journal public struct.
Interface afw_adaptor_session public struct.
date, time, and time zone.
Definition: afw_common.h:1760
Struct for memory pointer and size.
Definition: afw_common.h:505
Interface afw_object public struct.
NFC normalized UTF-8 string.
Definition: afw_common.h:545
Interface afw_value public struct.
Interface afw_xctx public struct.