19 #define AFW_IMPLEMENTATION_ID "file"
32 unsigned char century;
39 return &impl_afw_adaptor_journal_inf;
68 #define IMPL_RELATIVE_ENTRY_PATH_WA_Z_SIZE sizeof("y2016/m07/d04/h23")
70 impl_object_id_to_relative_entry_path(
80 if (entry_object_id->len < strlen(
"ccyymmddhh_0"))
return NULL;
81 o = &relative_entry_path_wa_z[0];
82 i = entry_object_id->s;
101 if (*i++ !=
'_')
return NULL;
102 for (*offset = 0, count = entry_object_id->len - strlen(
"ccyymmddhh_");
105 *offset = *offset * 10 + (*i++ -
'0');
108 return &relative_entry_path_wa_z[0];
127 impl_relative_entry_path_to_object_id(
136 impl_open_and_retrieve_peer_object(
139 apr_file_t * *peer_f,
163 rv = apr_stat(&finfo, *full_peer_path_z, APR_FINFO_SIZE,
165 if (rv != APR_SUCCESS)
goto error_peer_apr;
168 rv = apr_file_open(peer_f, *full_peer_path_z,
169 APR_FOPEN_READ + APR_FOPEN_WRITE + APR_FOPEN_BINARY,
170 APR_FPROT_OS_DEFAULT, apr_p);
171 if (rv != APR_SUCCESS)
goto error_peer_apr;
178 rv = apr_file_read(*peer_f, memory, &len);
179 if (rv != APR_SUCCESS)
goto error_peer;
180 if (len != finfo.size)
goto error_peer;
184 consumer_id, xctx->p, xctx);
196 *full_peer_path_z, footprint.z);
204 *full_peer_path_z, footprint.z);
209 impl_write_and_close_peer_object(
227 adaptor->content_type, peer, NULL, xctx->p, xctx);
232 rv = apr_file_seek(peer_f, APR_SET, &offset);
233 if (rv != APR_SUCCESS)
goto error_peer_apr;
238 rv = apr_file_write(peer_f, encoded->ptr, &len);
239 if (rv != APR_SUCCESS)
goto error_peer_apr;
240 if (len != encoded->size) {
247 rv = apr_file_trunc(peer_f, encoded->size);
248 if (rv != APR_SUCCESS)
goto error_peer_apr;
252 rv = apr_file_close(peer_f);
253 if (rv != APR_SUCCESS)
goto error_peer_apr;
264 full_peer_path_z, footprint.z);
308 adaptor->content_type, entry, NULL, xctx->p, xctx);
312 rv = apr_time_exp_gmt(&now, apr_time_now());
313 if (rv != APR_SUCCESS) {
319 "y%04d/m%02d/d%02d/h%02d",
320 now.tm_year + 1900, now.tm_mon + 1, now.tm_mday, now.tm_hour);
324 rv = apr_file_open(&lock_f, adaptor->journal_lock_file_path_z,
325 APR_FOPEN_READ + APR_FOPEN_CREATE + APR_FOPEN_WRITE +
326 APR_FOPEN_BINARY, APR_FPROT_OS_DEFAULT, apr_p);
327 if (APR_STATUS_IS_ENOENT(rv)) {
330 rv = apr_dir_make_recursive(adaptor->journal_dir_path_z,
331 APR_FPROT_OS_DEFAULT, apr_p);
332 if (rv == APR_SUCCESS) {
335 rv = apr_file_open(&lock_f, adaptor->journal_lock_file_path_z,
336 APR_FOPEN_READ + APR_FOPEN_CREATE + APR_FOPEN_WRITE +
337 APR_FOPEN_BINARY, APR_FPROT_OS_DEFAULT, apr_p);
340 if (rv != APR_SUCCESS)
goto error_lock_apr;
348 rv = apr_file_read(lock_f, &lock, &lock_len);
349 first_entry = rv == APR_EOF;
350 if (!first_entry && rv != APR_SUCCESS)
goto error_lock_apr;
354 "/path_to_first_journal_file",
356 temp_raw.ptr = (
const afw_byte_t *)relative_entry_path_z;
357 temp_raw.size = strlen(relative_entry_path_z);
359 afw_file_mode_write, xctx);
371 old_full_entry_path_z = NULL;
373 ( lock.century != now.tm_year / 100 + 19
374 || lock.year != now.tm_year % 100
375 || lock.month != now.tm_mon + 1
376 || lock.day != now.tm_mday
377 || lock.hour != now.tm_hour)
382 "/y%02d%02d/m%02d/d%02d/h%02d",
384 lock.century, lock.year, lock.month, lock.day, lock.hour);
389 lock.century = (now.tm_year / 100) + 19;
390 lock.year = now.tm_year % 100;
391 lock.month = now.tm_mon + 1;
392 lock.day = now.tm_mday;
393 lock.hour = now.tm_hour;
394 lock.min = now.tm_min;
395 lock.sec = now.tm_sec;
407 if (old_full_entry_path_z || first_entry) {
410 "/y%02d%02d/m%02d/d%02d/",
412 lock.century, lock.year, lock.month, lock.day);
414 rv = apr_dir_make_recursive(full_entry_dir_path_z,
415 APR_FPROT_OS_DEFAULT, apr_p);
416 if (rv != APR_SUCCESS)
goto error_journal_apr;
421 rv = apr_file_open(&entry_f, full_entry_path_z,
426 APR_FPROT_OS_DEFAULT, apr_p);
427 if (rv != APR_SUCCESS)
goto error_journal_apr;
432 rv = apr_file_seek(entry_f, APR_CUR, &offset);
433 if (rv != APR_SUCCESS)
goto error_journal_apr;
436 lock.century, lock.year, lock.month, lock.day, lock.hour,
441 len =
sizeof(encoded_len_be);
442 rv = apr_file_write(entry_f, &encoded_len_be, &len);
443 if (rv != APR_SUCCESS)
goto error_journal_apr;
448 rv = apr_file_write(entry_f, encoded->ptr, &len);
449 if (rv != APR_SUCCESS)
goto error_journal_apr;
453 rv = apr_file_close(entry_f);
454 if (rv != APR_SUCCESS)
goto error_journal_apr;
461 if (old_full_entry_path_z) {
464 rv = apr_file_open(&entry_f, old_full_entry_path_z,
468 APR_FPROT_OS_DEFAULT, apr_p);
469 if (rv != APR_SUCCESS)
goto error_old_journal_apr;
473 len =
sizeof(encoded_len_be);
474 encoded_len_be.i = 0;
475 rv = apr_file_write(entry_f, &encoded_len_be, &len);
476 if (rv != APR_SUCCESS)
goto error_old_journal_apr;
480 len = strlen(relative_entry_path_z);
481 rv = apr_file_write(entry_f, relative_entry_path_z, &len);
482 if (rv != APR_SUCCESS)
goto error_old_journal_apr;
486 rv = apr_file_close(entry_f);
487 if (rv != APR_SUCCESS)
goto error_old_journal_apr;
495 rv = apr_file_seek(lock_f, APR_SET, &offset);
496 if (rv != APR_SUCCESS)
goto error_lock_apr;
501 rv = apr_file_write(lock_f, &lock, &len);
502 if (rv != APR_SUCCESS)
goto error_lock_apr;
506 rv = apr_file_close(lock_f);
507 if (rv != APR_SUCCESS)
goto error_lock_apr;
515 " journal lock file %s - %s",
517 adaptor->journal_lock_file_path_z, footprint.z);
521 "Error detected while processing "
524 adaptor->journal_lock_file_path_z, footprint.z);
529 " journal file %s - %s",
531 full_entry_path_z, footprint.z);
533 error_old_journal_apr:
536 " journal file %s - %s",
538 old_full_entry_path_z, footprint.z);
583 afw_utf8_z_t relative_entry_path_wa_z[IMPL_RELATIVE_ENTRY_PATH_WA_Z_SIZE];
598 skip_first_entry =
false;
600 use_consumer =
false;
601 use_consumer_cursors =
false;
602 advance_consumer_cursor =
false;
603 check_filter =
false;
605 entry_object_id = NULL;
609 full_peer_path_z = NULL;
610 full_entry_path_z = NULL;
622 skip_first_entry =
true;
623 entry_object_id = entry_cursor;
628 use_consumer_cursors =
true;
633 skip_first_entry =
true;
634 entry_object_id = entry_cursor;
642 advance_consumer_cursor =
true;
646 entry_object_id = entry_cursor;
651 "Invalid option %d passed to afw_adaptor_journal_get_entry()",
659 peer = impl_open_and_retrieve_peer_object(instance, consumer_id,
660 &peer_f, &full_peer_path_z, xctx);
663 if (use_consumer_cursors) {
665 &afw_s_currentCursor, xctx);
667 &afw_s_consumeCursor, xctx);
669 &afw_s_advanceCursor, xctx);
671 entry_object_id = consumeCursor;
672 check_filter =
false;
675 else if (advanceCursor) {
676 entry_object_id = advanceCursor;
677 skip_first_entry =
false;
679 else if (currentCursor) {
680 entry_object_id = currentCursor;
681 skip_first_entry =
true;
693 "/path_to_first_journal_file",
704 else if (entry_object_id) {
705 relative_entry_path_z = impl_object_id_to_relative_entry_path(
706 entry_object_id, relative_entry_path_wa_z, &offset, xctx);
707 if (!relative_entry_path_z)
return;
712 "get_first or entry_object_id required",
721 entry_object_id = impl_relative_entry_path_to_object_id(
722 relative_entry_path_z, offset, xctx);
726 open_journal =
false;
728 adaptor->journal_dir_path_z,
729 relative_entry_path_z, NULL);
731 rv = apr_file_open(&entry_f, full_entry_path_z,
732 APR_FOPEN_READ + APR_FOPEN_BINARY, APR_FPROT_OS_DEFAULT,
734 if (rv != APR_SUCCESS)
goto error_journal_apr;
739 rv = apr_file_seek(entry_f, APR_SET, &offset);
740 if (rv != APR_SUCCESS)
goto error_journal_apr;
741 len =
sizeof(encoded_len_be);
743 rv = apr_file_read(entry_f, &encoded_len_be, &len);
744 if (APR_STATUS_IS_EOF(rv))
break;
745 if (rv != APR_SUCCESS)
goto error_journal_apr;
747 if (len !=
sizeof(encoded_len_be))
goto error_journal;
749 encoded_len_be, xctx);
755 len =
sizeof(relative_entry_path_wa_z);
756 rv = apr_file_read(entry_f, &relative_entry_path_wa_z[0], &len);
757 if (rv != APR_SUCCESS)
goto error_journal_apr;
758 relative_entry_path_z = &relative_entry_path_wa_z[0];
760 rv = apr_file_close(entry_f);
761 if (rv != APR_SUCCESS)
goto error_journal_apr;
768 if (!skip_first_entry) {
775 rv = apr_file_read(entry_f, memory, &len);
776 if (rv != APR_SUCCESS)
goto error_journal_apr;
778 if (len != buffer.size)
goto error_journal;
780 consumer_id, p, xctx);
793 else if (check_filter) {
795 instance, entry, peer, &filter, xctx);
804 if (applicable)
break;
808 skip_first_entry =
false;
809 offset = encoded_len + offset +
sizeof(encoded_len_be);
813 rv = apr_file_close(entry_f);
814 if (rv != APR_SUCCESS)
goto error_journal_apr;
823 &afw_s_lastContactTime, now, xctx);
826 if (advance_consumer_cursor) {
828 &afw_s_advanceCursor, entry_object_id, xctx);
832 else if (use_consumer_cursors && !reissue) {
840 &afw_s_currentCursor, entry_object_id, xctx);
842 &afw_s_consumeCursor, entry_object_id, xctx);
844 &afw_s_consumeStartTime, now, xctx);
852 &afw_s_advanceCursor, entry_object_id, xctx);
858 impl_write_and_close_peer_object(instance, peer, peer_f,
859 full_peer_path_z, xctx);
865 entry_object_id, xctx);
871 if (applicable && !advance_consumer_cursor) {
879 "Error detected processing adaptor %" AFW_UTF8_FMT " journal file %s - %s",
880 adaptor->pub.adaptor_id.len, adaptor->pub.adaptor_id.s,
881 full_entry_path_z, footprint.z);
885 "Error detected while processing "
888 full_entry_path_z, footprint.z);
911 peer = impl_open_and_retrieve_peer_object(instance,
912 consumer_id, &peer_f,
913 &full_peer_path_z, xctx);
916 &afw_s_consumeCursor, xctx);
917 if (!consume_cursor || !
afw_utf8_equal(entry_cursor, consume_cursor)) {
919 "Object id supplied is not currently being consumed", xctx);
932 impl_write_and_close_peer_object(instance, peer, peer_f, full_peer_path_z,
Interface afw_interface implementation declares.
Adaptive Framework Core Internal.
afw_adaptor_impl_is_journal_entry_applicable(const afw_adaptor_journal_t *instance, const afw_object_t *entry, const afw_object_t *consumer, const afw_value_t *const *filter, afw_xctx_t *xctx)
Determine whether a journal entry is applicable to a consumer.
void impl_afw_adaptor_journal_get_entry(const afw_adaptor_journal_t *instance, const afw_adaptor_impl_request_t *impl_request, afw_adaptor_journal_option_t option, const afw_utf8_t *consumer_id, const afw_utf8_t *entry_cursor, afw_size_t limit, const afw_object_t *response, afw_xctx_t *xctx)
const afw_utf8_t * impl_afw_adaptor_journal_add_entry(const afw_adaptor_journal_t *instance, const afw_adaptor_impl_request_t *impl_request, const afw_object_t *entry, afw_xctx_t *xctx)
void impl_afw_adaptor_journal_mark_entry_consumed(const afw_adaptor_journal_t *instance, const afw_adaptor_impl_request_t *impl_request, const afw_utf8_t *consumer_id, const afw_utf8_t *entry_cursor, afw_xctx_t *xctx)
afw_object_set_property_as_boolean(const afw_object_t *object, const afw_utf8_t *property_name, afw_boolean_t internal, afw_xctx_t *xctx)
Set property function for data type boolean values.
afw_object_set_property_as_dateTime(const afw_object_t *object, const afw_utf8_t *property_name, const afw_dateTime_t *internal, afw_xctx_t *xctx)
Set property function for data type dateTime values.
#define afw_value_is_object(A_VALUE)
Macro to determine if value is evaluated object.
afw_object_set_property_as_object(const afw_object_t *object, const afw_utf8_t *property_name, const afw_object_t *internal, afw_xctx_t *xctx)
Set property function for data type object values.
#define afw_object_old_get_property_as_string(object, property_name, xctx)
Get property function for data type string value.
afw_object_set_property_as_string(const afw_object_t *object, const afw_utf8_t *property_name, const afw_utf8_t *internal, afw_xctx_t *xctx)
Set property function for data type string values.
#define AFW_UTF8_FMT_ARG(A_STRING)
Convenience Macro for use with AFW_UTF8_FMT to specify arg.
#define AFW_INTEGER_FMT
Format string specifier used for afw_integer_t.
unsigned char afw_byte_t
A byte of memory (unsigned).
#define AFW_UTF8_FMT
Format string specifier used for afw_utf8_t.
afw_utf8_octet_t afw_utf8_z_t
NFC normalized UTF-8 null terminated string.
apr_size_t afw_size_t
size_t.
apr_int64_t afw_integer_t
typedef for big signed int.
enum afw_adaptor_journal_option_e afw_adaptor_journal_option_t
Typedef for afw_adaptor_journal get_entry options enum.
apr_off_t afw_off_t
off_t.
@ afw_adaptor_journal_option_get_next_for_consumer_after_cursor
afw_adaptor_journal get_entry option get_next_for_consumer_after_cursor
@ afw_adaptor_journal_option_get_next_for_consumer
afw_adaptor_journal get_entry option get_next_for_consumer
@ afw_adaptor_journal_option_advance_cursor_for_consumer
afw_adaptor_journal get_entry option advance_cursor_for_consumer
@ afw_adaptor_journal_option_get_next_after_cursor
afw_adaptor_journal get_entry option get_next_after_cursor
@ afw_adaptor_journal_option_get_by_cursor
afw_adaptor_journal get_entry option get_by_cursor
@ afw_adaptor_journal_option_get_first
afw_adaptor_journal get_entry option get_first
#define afw_content_type_raw_to_value(instance, raw, source_location, p, xctx)
Call method raw_to_value of interface afw_content_type.
#define afw_content_type_object_to_raw(instance, object, options, p, xctx)
Convert object to the raw in specified pool.
afw_size_t afw_endian_safe_big_uint64_to_native_size_t(afw_endian_big_uint64_t big, afw_xctx_t *xctx)
Safe afw_endian_big_uint64_t to native afw_size_t.
afw_endian_big_uint64_t afw_endian_native_to_big_uint64(afw_uint64_t native)
native afw_uint64_t to afw_endian_big_uint64_t
afw_endian_big_uint32_t afw_endian_native_to_big_uint32(afw_uint32_t native)
native afw_uint32_t to afw_endian_big_uint32_t
#define AFW_THROW_ERROR_RV_Z(code, rv_source_id, rv, message_z, xctx)
Macro used to set error and rv in xctx and throw it.
#define AFW_ERROR_FOOTPRINT(_footprint_)
Set error footprint.
#define AFW_THROW_ERROR_FZ(code, xctx, format_z,...)
Macro used to set error and 0 rv in xctx and throw it.
#define AFW_THROW_ERROR_FOOTPRINT_FZ(code, xctx, format_z,...)
Macro used to set error and 0 rv in xctx using line number in footprint then throw it.
#define AFW_THROW_ERROR_Z(code, message_z, xctx)
Macro used to set error and 0 rv in xctx and throw it.
#define AFW_THROW_ERROR_FOOTPRINT_RV_FZ(code, rv_source_id, rv, xctx, format_z,...)
Macro used to set error and rv in xctx using line number in footprint then throw it.
afw_file_from_memory(const afw_utf8_t *file_path, const afw_memory_t *from_memory, afw_file_mode_t mode, afw_xctx_t *xctx)
Write a file from a memory.
afw_file_to_memory(const afw_utf8_t *file_path, apr_size_t file_size, const afw_pool_t *p, afw_xctx_t *xctx)
Read a file into a memory in a specifed pool.
afw_object_remove_property(const afw_object_t *instance, const afw_utf8_t *property_name, afw_xctx_t *xctx)
Remove a property from object.
#define AFW_OBJECT_Q_OBJECT_TYPE_ID_PROVISIONING_PEER
Quoted object type id for Provisioning Peer object.
#define AFW_OBJECT_Q_OBJECT_TYPE_ID_JOURNAL_ENTRY
Quoted object type id for Journal Entry object.
#define afw_pool_get_apr_pool(instance)
Call method get_apr_pool of interface afw_pool.
afw_size_t afw_safe_cast_off_to_size(afw_off_t off, afw_xctx_t *xctx)
Safely cast afw_off_t to afw_size_t.
afw_dateTime_now_utc(const afw_pool_t *p, afw_xctx_t *xctx)
Get now time as dateTime in specified pool.
const afw_utf8_t * afw_utf8_from_raw(const afw_memory_t *raw, const afw_pool_t *p, afw_xctx_t *xctx)
Convert raw to a utf-8 NFC normalizing if necessary in specified pool.
afw_boolean_t afw_utf8_equal(const afw_utf8_t *s1, const afw_utf8_t *s2)
Check to see if a string equals another string.
const afw_utf8_z_t * afw_utf8_z_printf(const afw_pool_t *p, afw_xctx_t *xctx, const afw_utf8_z_t *format_z,...)
const afw_utf8_z_t * afw_utf8_to_utf8_z(const afw_utf8_t *string, const afw_pool_t *p, afw_xctx_t *xctx)
Convert utf8 to utf8_z in specified pool.
const afw_utf8_z_t * afw_utf8_z_concat(const afw_pool_t *p, afw_xctx_t *xctx,...)
afw_utf8_printf(const afw_pool_t *p, afw_xctx_t *xctx, const afw_utf8_z_t *format,...)
Create a utf-8 string using a c format string in specified pool.
#define afw_xctx_calloc(size, xctx)
Macro to allocate cleared memory in xctx's pool.
Internal request info used by afw_adaptor_impl*() functions.
Interface afw_adaptor_journal_inf_s struct.
Interface afw_adaptor_journal public struct.
date, time, and time zone.
Struct for memory pointer and size.
Interface afw_object public struct.
Interface afw_pool public struct.
NFC normalized UTF-8 string.
struct for data type object values.
Interface afw_value public struct.
Interface afw_xctx public struct.
32-bit unsigned big endian integer.
64-bit unsigned big endian integer.