23 #define AFW_IMPLEMENTATION_ID "lmdb"
35 self->pub.inf = &impl_afw_adaptor_journal_inf;
70 dbi = afw_lmdb_internal_open_database(adaptor,
71 AFW_LMDB_GET_TRANSACTION(),
72 &afw_lmdb_s_Journal, MDB_CREATE, xctx->p, xctx);
75 cursor = afw_lmdb_internal_open_cursor(session, dbi, xctx);
79 "Error getting cursor for journal database.", xctx);
82 rc = mdb_cursor_get(cursor, &key, &data, MDB_LAST);
83 if (rc == MDB_NOTFOUND) {
86 key.mv_data = (
void *) &t;
87 key.mv_size =
sizeof(t);
89 t = *((apr_uint64_t *)(key.mv_data));
90 AFW_ENDIAN_BIG_TO_NATIVE_64(&t);
95 AFW_ENDIAN_NATIVE_TO_BIG_64(&t);
100 adaptor->ubjson, entry, NULL, xctx->p, xctx);
101 data.mv_data = (
void *)entry_string->ptr;
102 data.mv_size = entry_string->size;
104 rc = mdb_cursor_put(cursor, &key, &data, MDB_APPEND);
108 "Error appending journal entry database.", xctx);
111 mdb_cursor_close(cursor);
118 AFW_ENDIAN_BIG_TO_NATIVE_64(&t);
126 afw_lmdb_adaptor_journal_get_peer_object(
132 const afw_uuid_t *uuid,
140 memset(&data, 0,
sizeof(MDB_val));
141 key.mv_data = (
void*)uuid;
142 key.mv_size =
sizeof(afw_uuid_t);
144 if (mdb_get(txn, dbi, &key, &data) == 0) {
145 raw.size = data.mv_size;
146 raw.ptr = data.mv_data;
163 afw_lmdb_journal_update_peer(
168 const afw_uuid_t *uuid,
176 afw_lmdb_internal_replace_entry_from_object(session,
177 &afw_s__AdaptiveJournalEntry_, object_id, updated_object,
182 afw_lmdb_adaptor_journal_get_entry_object(
197 AFW_ENDIAN_NATIVE_TO_BIG_64(&cursor);
199 memset(&data, 0,
sizeof(MDB_val));
200 key.mv_data = (
void *)&cursor;
201 key.mv_size =
sizeof(cursor);
203 if (mdb_get(txn, dbi, &key, &data) == 0) {
204 raw.size = data.mv_size;
205 raw.ptr = data.mv_data;
222 afw_lmdb_journal_get_first(
237 entry = afw_lmdb_adaptor_journal_get_entry_object(
238 self, session, adaptor, dbiJournal, txn, cursor, xctx);
250 afw_lmdb_journal_get_by_cursor(
263 cursor = apr_strtoi64(
266 entry = afw_lmdb_adaptor_journal_get_entry_object(
267 self, session, adaptor, dbiJournal, txn, cursor, xctx);
279 afw_lmdb_journal_get_next_after_cursor(
294 cursor = apr_strtoi64(
297 entry = afw_lmdb_adaptor_journal_get_entry_object(
298 self, session, adaptor, dbiJournal, txn, cursor, xctx);
323 MDB_dbi dbiConsumers;
325 const afw_uuid_t *uuid;
336 cursor = apr_strtoi64(
339 dbiConsumers = afw_lmdb_internal_open_database(session->adaptor,
340 txn, &afw_lmdb_s_Primary, 0, xctx->p, xctx);
345 peer = afw_lmdb_adaptor_journal_get_peer_object(
346 self, session, adaptor, dbiConsumers, txn, uuid, xctx);
349 "Error, provisioning peer not found.", xctx);
353 peer, &afw_s_advanceCursor, xctx->p, xctx);
355 peer, &afw_s_consumeFilter, xctx);
358 for (i = 0; (i < limit) || !found; i++) {
359 entry = afw_lmdb_adaptor_journal_get_entry_object(
self,
360 session, adaptor, dbiJournal, txn, cursor, xctx);
366 entry, peer, &consumer_filter, xctx)) {
385 &afw_s_consumeStartTime, now, xctx);
387 &afw_s_consumeCursor, cursor_str, xctx);
389 &afw_s_currentCursor, cursor_str, xctx);
391 &afw_s_advanceCursor, xctx);
395 &afw_s_entryCursor, cursor_str, xctx);
399 &afw_s_entry, entry, xctx);
402 if (advance_cursor) {
407 &afw_s_advanceCursor, cursor_str, xctx);
412 &afw_s_lastContactTime, now, xctx);
415 afw_lmdb_journal_update_peer(
self, session, adaptor,
416 dbiConsumers, uuid, peer, xctx);
433 MDB_dbi dbiConsumers;
434 const afw_uuid_t *uuid;
445 dbiConsumers = afw_lmdb_internal_open_database(session->adaptor,
446 txn, &afw_lmdb_s_Primary, 0, xctx->p, xctx);
451 peer = afw_lmdb_adaptor_journal_get_peer_object(
452 self, session, adaptor, dbiConsumers, txn, uuid, xctx);
455 "Error, provisioning peer not found.", xctx);
459 peer, &afw_s_currentCursor, xctx->p, xctx);
461 peer, &afw_s_advanceCursor, xctx->p, xctx);
463 peer, &afw_s_consumeFilter, xctx);
465 peer, &afw_s_consumeCursor, xctx);
472 cursor = apr_strtoi64(
474 else if (advance_cursor)
475 cursor = apr_strtoi64(
477 else if (current_cursor)
478 cursor = apr_strtoi64(
484 for (i = 0; (i < limit) || !found; i++) {
485 entry = afw_lmdb_adaptor_journal_get_entry_object(
self,
486 session, adaptor, dbiJournal, txn, cursor, xctx);
492 entry, peer, &consumer_filter, xctx)) {
510 if (consume_cursor) {
516 &afw_s_consumeStartTime, now, xctx);
518 &afw_s_consumeCursor, cursor_str, xctx);
520 &afw_s_currentCursor, cursor_str, xctx);
522 &afw_s_advanceCursor, xctx);
527 &afw_s_entryCursor, cursor_str, xctx);
531 &afw_s_entry, entry, xctx);
534 if (advance_cursor) {
539 &afw_s_advanceCursor, cursor_str, xctx);
544 &afw_s_lastContactTime, now, xctx);
547 afw_lmdb_journal_update_peer(
self, session, adaptor,
548 dbiConsumers, uuid, peer, xctx);
565 MDB_dbi dbiConsumers;
566 const afw_uuid_t *uuid;
577 dbiConsumers = afw_lmdb_internal_open_database(session->adaptor,
578 txn, &afw_lmdb_s_Primary, 0, xctx->p, xctx);
583 peer = afw_lmdb_adaptor_journal_get_peer_object(
584 self, session, adaptor, dbiConsumers, txn, uuid, xctx);
587 "Error, provisioning peer not found.", xctx);
591 peer, &afw_s_currentCursor, xctx->p, xctx);
593 peer, &afw_s_advanceCursor, xctx->p, xctx);
595 peer, &afw_s_consumeFilter, xctx);
597 peer, &afw_s_consumeCursor, xctx);
604 cursor = apr_strtoi64(
606 else if (advance_cursor)
607 cursor = apr_strtoi64(
609 else if (current_cursor)
610 cursor = apr_strtoi64(
616 for (i = 0; (i < limit) || !found; i++) {
617 entry = afw_lmdb_adaptor_journal_get_entry_object(
self,
618 session, adaptor, dbiJournal, txn, cursor, xctx);
624 entry, peer, &consumer_filter, xctx)) {
642 if (consume_cursor) {
648 &afw_s_consumeStartTime, now, xctx);
650 &afw_s_consumeCursor, cursor_str, xctx);
652 &afw_s_currentCursor, cursor_str, xctx);
654 &afw_s_advanceCursor, xctx);
659 &afw_s_entryCursor, cursor_str, xctx);
662 if (advance_cursor) {
667 &afw_s_advanceCursor, cursor_str, xctx);
672 &afw_s_lastContactTime, now, xctx);
675 afw_lmdb_journal_update_peer(
self, session, adaptor,
676 dbiConsumers, uuid, peer, xctx);
705 txn = AFW_LMDB_GET_TRANSACTION();
708 dbiJournal = afw_lmdb_internal_open_database(adaptor,
709 txn, &afw_lmdb_s_Journal, 0, xctx->p, xctx);
713 afw_lmdb_journal_get_first(
self,
714 session, adaptor, dbiJournal, txn, response, xctx);
717 afw_lmdb_journal_get_by_cursor(
self, session,
718 adaptor, dbiJournal, txn, entry_cursor, response, xctx);
721 afw_lmdb_journal_get_next_after_cursor(
self, session,
722 adaptor, dbiJournal, txn, entry_cursor, limit, response, xctx);
726 self, session, adaptor, dbiJournal, txn, consumer_id,
727 limit, response, xctx);
731 self, session, adaptor, dbiJournal, txn, consumer_id,
732 entry_cursor, limit, response, xctx);
736 self, session, adaptor, dbiJournal, txn, consumer_id,
737 limit, response, xctx);
768 const afw_uuid_t *uuid;
769 MDB_dbi dbiConsumers;
775 txn = AFW_LMDB_GET_TRANSACTION();
777 dbiConsumers = afw_lmdb_internal_open_database(adaptor,
778 txn, &afw_lmdb_s_Primary, 0, xctx->p, xctx);
783 peer = afw_lmdb_adaptor_journal_get_peer_object(
784 self, session, adaptor, dbiConsumers, txn, uuid, xctx);
787 &afw_s_consumeCursor, xctx);
788 if (!consume_cursor || !
afw_utf8_equal(entry_cursor, consume_cursor)) {
790 "Object id supplied is not currently being consumed", xctx);
803 afw_lmdb_journal_update_peer(
self, session, adaptor,
804 dbiConsumers, uuid, peer, xctx);
Adaptive Framework Core API.
Helpers for adaptor implementation development.
Interface afw_interface implementation declares.
Adaptive Framework LMDB Adaptor.
Adaptive Framework register generated (afw_lmdb) header.
Adaptive Framework LMDB Adaptor Internal Header.
#define AFW_LMDB_END_TRANSACTION()
End an LMDB transaction.
#define AFW_LMDB_BEGIN_TRANSACTION(adaptor, session, flags, exclusive, xctx)
Begin an LMDB transaction.
#define AFW_LMDB_COMMIT_TRANSACTION()
Commit a transaction.
void afw_lmdb_journal_get_next_for_consumer_after_cursor(afw_lmdb_journal_t *self, afw_lmdb_adaptor_session_t *session, afw_lmdb_adaptor_t *adaptor, MDB_dbi dbiJournal, MDB_txn *txn, const afw_utf8_t *consumer_id, const afw_utf8_t *entry_cursor, afw_size_t limit, const afw_object_t *response, afw_xctx_t *xctx)
void afw_lmdb_journal_advance_cursor_for_consumer(afw_lmdb_journal_t *self, afw_lmdb_adaptor_session_t *session, afw_lmdb_adaptor_t *adaptor, MDB_dbi dbiJournal, MDB_txn *txn, const afw_utf8_t *consumer_id, afw_size_t limit, const afw_object_t *response, afw_xctx_t *xctx)
afw_lmdb_journal_t * afw_lmdb_journal_create(afw_lmdb_adaptor_session_t *session, afw_xctx_t *xctx)
Internal create a LMDB adaptor journal.
void impl_afw_adaptor_journal_get_next_for_consumer(afw_lmdb_journal_t *self, afw_lmdb_adaptor_session_t *session, afw_lmdb_adaptor_t *adaptor, MDB_dbi dbiJournal, MDB_txn *txn, const afw_utf8_t *consumer_id, afw_size_t limit, const afw_object_t *response, afw_xctx_t *xctx)
afw_adaptor_impl_is_journal_entry_applicable(const afw_adaptor_journal_t *instance, const afw_object_t *entry, const afw_object_t *consumer, const afw_value_t *const *filter, afw_xctx_t *xctx)
Determine whether a journal entry is applicable to a consumer.
void impl_afw_adaptor_journal_get_entry(const afw_adaptor_journal_t *instance, const afw_adaptor_impl_request_t *impl_request, afw_adaptor_journal_option_t option, const afw_utf8_t *consumer_id, const afw_utf8_t *entry_cursor, afw_size_t limit, const afw_object_t *response, afw_xctx_t *xctx)
const afw_utf8_t * impl_afw_adaptor_journal_add_entry(const afw_adaptor_journal_t *instance, const afw_adaptor_impl_request_t *impl_request, const afw_object_t *entry, afw_xctx_t *xctx)
void impl_afw_adaptor_journal_mark_entry_consumed(const afw_adaptor_journal_t *instance, const afw_adaptor_impl_request_t *impl_request, const afw_utf8_t *consumer_id, const afw_utf8_t *entry_cursor, afw_xctx_t *xctx)
afw_object_set_property_as_boolean(const afw_object_t *object, const afw_utf8_t *property_name, afw_boolean_t internal, afw_xctx_t *xctx)
Set property function for data type boolean values.
afw_object_set_property_as_dateTime(const afw_object_t *object, const afw_utf8_t *property_name, const afw_dateTime_t *internal, afw_xctx_t *xctx)
Set property function for data type dateTime values.
#define afw_value_is_object(A_VALUE)
Macro to determine if value is evaluated object.
afw_object_set_property_as_object(const afw_object_t *object, const afw_utf8_t *property_name, const afw_object_t *internal, afw_xctx_t *xctx)
Set property function for data type object values.
afw_value_as_object(const afw_value_t *value, afw_xctx_t *xctx)
Typesafe cast of data type object.
#define afw_object_old_get_property_as_string(object, property_name, xctx)
Get property function for data type string value.
afw_object_set_property_as_string(const afw_object_t *object, const afw_utf8_t *property_name, const afw_utf8_t *internal, afw_xctx_t *xctx)
Set property function for data type string values.
apr_size_t afw_size_t
size_t.
enum afw_adaptor_journal_option_e afw_adaptor_journal_option_t
Typedef for afw_adaptor_journal get_entry options enum.
@ afw_adaptor_journal_option_get_next_for_consumer_after_cursor
afw_adaptor_journal get_entry option get_next_for_consumer_after_cursor
@ afw_adaptor_journal_option_get_next_for_consumer
afw_adaptor_journal get_entry option get_next_for_consumer
@ afw_adaptor_journal_option_advance_cursor_for_consumer
afw_adaptor_journal get_entry option advance_cursor_for_consumer
@ afw_adaptor_journal_option_get_next_after_cursor
afw_adaptor_journal get_entry option get_next_after_cursor
@ afw_adaptor_journal_option_get_by_cursor
afw_adaptor_journal get_entry option get_by_cursor
@ afw_adaptor_journal_option_get_first
afw_adaptor_journal get_entry option get_first
#define afw_content_type_raw_to_value(instance, raw, source_location, p, xctx)
Call method raw_to_value of interface afw_content_type.
#define afw_content_type_object_to_raw(instance, object, options, p, xctx)
Convert object to the raw in specified pool.
#define AFW_THROW_ERROR_RV_Z(code, rv_source_id, rv, message_z, xctx)
Macro used to set error and rv in xctx and throw it.
#define AFW_THROW_ERROR_Z(code, message_z, xctx)
Macro used to set error and 0 rv in xctx and throw it.
#define afw_object_get_property(instance, property_name, xctx)
Call method get_property of interface afw_object.
afw_object_remove_property(const afw_object_t *instance, const afw_utf8_t *property_name, afw_xctx_t *xctx)
Remove a property from object.
afw_object_old_get_property_as_utf8(const afw_object_t *instance, const afw_utf8_t *property_name, const afw_pool_t *p, afw_xctx_t *xctx)
Get an object's property value as a string in specified pool.
afw_dateTime_now_utc(const afw_pool_t *p, afw_xctx_t *xctx)
Get now time as dateTime in specified pool.
afw_boolean_t afw_utf8_equal(const afw_utf8_t *s1, const afw_utf8_t *s2)
Check to see if a string equals another string.
const afw_utf8_z_t * afw_utf8_to_utf8_z(const afw_utf8_t *string, const afw_pool_t *p, afw_xctx_t *xctx)
Convert utf8 to utf8_z in specified pool.
afw_utf8_printf(const afw_pool_t *p, afw_xctx_t *xctx, const afw_utf8_z_t *format,...)
Create a utf-8 string using a c format string in specified pool.
afw_uuid_from_utf8(const afw_utf8_t *s, const afw_pool_t *p, afw_xctx_t *xctx)
Convert standard format UUID utf-8 string to uuid.
afw_uuid_to_utf8(const afw_uuid_t *uuid, const afw_pool_t *p, afw_xctx_t *xctx)
Convert uuid to a standard format UUID utf-8 string.
#define afw_xctx_calloc_type(type, xctx)
Macro to allocate cleared memory to hold type in xctx's pool.
Internal request info used by afw_adaptor_impl*() functions.
Interface afw_adaptor_journal public struct.
Interface afw_adaptor_session public struct.
date, time, and time zone.
Struct for memory pointer and size.
Interface afw_object public struct.
NFC normalized UTF-8 string.
Interface afw_value public struct.
Interface afw_xctx public struct.