Adaptive Framework  0.9.0
All Data Structures Files Functions Variables Typedefs Enumerations Enumerator Macros Modules Pages
afw_file_journal.c
Go to the documentation of this file.
1 // See the 'COPYING' file in the project root for licensing information.
2 /*
3  * Adaptive framework file journal support
4  *
5  * Copyright (c) 2010-2023 Clemson University
6  *
7  */
8 
14 #include "afw_internal.h"
15 
16 
17 
18 /* Declares and rti/inf defines for interface afw_adaptor_factory */
19 #define AFW_IMPLEMENTATION_ID "file"
21 
22 
23 /* afw_adaptor_journal lock holds UTC time of last use. */
25  afw_endian_big_uint32_t usec; /* (0-999999) microseconds past min */
26  unsigned char sec; /* (0-61) seconds past min */
27  unsigned char min; /* (0-59) minutes past hour */
28  unsigned char hour; /* (0-23) hours past midnight */
29  unsigned char day; /* (1-31) day of the month */
30  unsigned char month; /* (1-12) month of the year */
31  unsigned char year; /* (0-99) year past century */
32  unsigned char century; /* (19-*) century */
33  unsigned char filler; /* filler - ignore */
35 
36 
37 const afw_adaptor_journal_inf_t * afw_file_internal_get_journal_inf()
38 {
39  return &impl_afw_adaptor_journal_inf;
40 }
41 
42 /*
43  * Convert entry_object_id to relative_entry_path_z
44  *
45  * param entry_object_id
46  * param relative_entry_path_wa_z to be updated
47  * param offset to be set
48  * param xctx
49  * return pointer to updated relative_entry_path_wa_z or NULL if error.
50  *
51  * relative_entry_path_wa_z must be size IMPL_RELATIVE_ENTRY_PATH_WA_Z_SIZE
52  *
53  * Convert entry_object_id to relative journal file path and offset.
54  *
55  * Token format: ccyymmddhh_offset.
56  *
57  * Relative path format: 'y'ccyy/'m'mm/'d'dd/'h'hh
58  *
59  * For example if cursor is: 2016070423_3183
60  *
61  * Relative path to journal file will be: y2016/m07/d04/h23
62  *
63  * And journal entry will be at offset 3183.
64  *
65  * Note: this doesn't strictly check cursor syntax because open will
66  * take care of not found. Should probably do more checking.
67  */
68 #define IMPL_RELATIVE_ENTRY_PATH_WA_Z_SIZE sizeof("y2016/m07/d04/h23")
69 static afw_utf8_z_t *
70 impl_object_id_to_relative_entry_path(
71  const afw_utf8_t *entry_object_id,
72  afw_utf8_z_t relative_entry_path_wa_z[],
73  afw_off_t *offset,
74  afw_xctx_t *xctx)
75 {
76  const afw_utf8_z_t *i;
77  afw_utf8_z_t *o;
78  afw_size_t count;
79 
80  if (entry_object_id->len < strlen("ccyymmddhh_0")) return NULL;
81  o = &relative_entry_path_wa_z[0];
82  i = entry_object_id->s;
83  *o++ = 'y';
84  *o++ = *i++;
85  *o++ = *i++;
86  *o++ = *i++;
87  *o++ = *i++;
88  *o++ = '/';
89  *o++ = 'm';
90  *o++ = *i++;
91  *o++ = *i++;
92  *o++ = '/';
93  *o++ = 'd';
94  *o++ = *i++;
95  *o++ = *i++;
96  *o++ = '/';
97  *o++ = 'h';
98  *o++ = *i++;
99  *o++ = *i++;
100  *o = 0;
101  if (*i++ != '_') return NULL;
102  for (*offset = 0, count = entry_object_id->len - strlen("ccyymmddhh_");
103  count > 0; count--)
104  {
105  *offset = *offset * 10 + (*i++ - '0');
106  }
107 
108  return &relative_entry_path_wa_z[0];
109 }
110 
111 
112 /*
113  * Convert relative_entry_path_z to entry_object_id.
114  *
115  * param z relative_entry_path_z
116  * param offset
117  * param xctx
118  * return entry_object_id
119  *
120  * Example: 1
121  * 01234567890123456
122  * relative_entry_path_z y2016/m07/d04/h23 offset 1234
123  *
124  * yields entry_object_id 2016070423_1234
125  */
126 AFW_DEFINE_STATIC_INLINE(const afw_utf8_t *)
127 impl_relative_entry_path_to_object_id(
128  const afw_utf8_z_t *z, afw_off_t offset, afw_xctx_t *xctx)
129 {
130  return afw_utf8_printf(xctx->p, xctx,
131  "%.4s%.2s%.2s%.2s_%" AFW_INTEGER_FMT,
132  z + 1, z + 7, z + 11, z + 15, (afw_integer_t)offset);
133 }
134 
135 static const afw_object_t *
136 impl_open_and_retrieve_peer_object(
137  const afw_adaptor_journal_t *journal,
138  const afw_utf8_t *consumer_id,
139  apr_file_t * *peer_f,
140  const afw_utf8_z_t * *full_peer_path_z,
141  afw_xctx_t *xctx)
142 {
144  (afw_file_internal_adaptor_session_t *)journal->session;
145  afw_file_internal_adaptor_t *adaptor = session->adaptor;
146  const afw_pool_t *p = xctx->p;
147  apr_pool_t * apr_p = afw_pool_get_apr_pool(p);
148  void *memory;
149  afw_memory_t buffer;
150  apr_finfo_t finfo;
151  const afw_value_t *value;
152  afw_error_footprint_t footprint;
153  apr_status_t rv;
154  afw_size_t len;
155 
156  *full_peer_path_z = afw_utf8_z_printf(p, xctx,
158  "/%" AFW_UTF8_FMT,
159  AFW_UTF8_FMT_ARG(adaptor->root),
160  AFW_UTF8_FMT_ARG(consumer_id));
161 
162  AFW_ERROR_FOOTPRINT("apr_stat()");
163  rv = apr_stat(&finfo, *full_peer_path_z, APR_FINFO_SIZE,
164  afw_pool_get_apr_pool(xctx->p));
165  if (rv != APR_SUCCESS) goto error_peer_apr;
166 
167  AFW_ERROR_FOOTPRINT("apr_file_open()");
168  rv = apr_file_open(peer_f, *full_peer_path_z,
169  APR_FOPEN_READ + APR_FOPEN_WRITE + APR_FOPEN_BINARY,
170  APR_FPROT_OS_DEFAULT, apr_p);
171  if (rv != APR_SUCCESS) goto error_peer_apr;
172 
173  AFW_ERROR_FOOTPRINT("apr_file_read()");
174  buffer.size = afw_safe_cast_off_to_size(finfo.size, xctx);
175  len = buffer.size;
176  memory = afw_xctx_calloc(len, xctx);
177  buffer.ptr = memory;
178  rv = apr_file_read(*peer_f, memory, &len);
179  if (rv != APR_SUCCESS) goto error_peer;
180  if (len != finfo.size) goto error_peer;
181 
182  AFW_ERROR_FOOTPRINT("afw_content_type_raw_to_value()");
183  value = afw_content_type_raw_to_value(adaptor->content_type, &buffer,
184  consumer_id, xctx->p, xctx);
185  if (!value || !afw_value_is_object(value)) goto error_peer;
186 
187  /* Return peer object. */
188  return ((const afw_value_object_t *)value)->internal;
189 
190 error_peer:
191  AFW_THROW_ERROR_FOOTPRINT_FZ(general, xctx,
192  "Error detected processing adaptor %" AFW_UTF8_FMT " "
194  " file %s - %s",
195  AFW_UTF8_FMT_ARG(&adaptor->pub.adaptor_id),
196  *full_peer_path_z, footprint.z);
197 
198 error_peer_apr:
199  AFW_THROW_ERROR_FOOTPRINT_RV_FZ(general, apr, rv, xctx,
200  "Error detected processing adaptor %" AFW_UTF8_FMT " "
202  " file %s - %s",
203  AFW_UTF8_FMT_ARG(&adaptor->pub.adaptor_id),
204  *full_peer_path_z, footprint.z);
205 }
206 
207 
208 static void
209 impl_write_and_close_peer_object(
210  const afw_adaptor_journal_t *journal,
211  const afw_object_t * peer,
212  apr_file_t *peer_f,
213  const afw_utf8_z_t *full_peer_path_z,
214  afw_xctx_t *xctx)
215 {
217  (afw_file_internal_adaptor_session_t *)journal->session;
218  afw_file_internal_adaptor_t *adaptor = session->adaptor;
219  afw_error_footprint_t footprint;
220  apr_status_t rv;
221  const afw_memory_t *encoded;
222  afw_size_t len;
223  apr_off_t offset;
224 
225  /* Encode peer object. */
227  adaptor->content_type, peer, NULL, xctx->p, xctx);
228 
229  /* Seek to start of peer file. */
230  AFW_ERROR_FOOTPRINT("apr_file_seek()");
231  offset = 0;
232  rv = apr_file_seek(peer_f, APR_SET, &offset);
233  if (rv != APR_SUCCESS) goto error_peer_apr;
234 
235  /* Write to store. */
236  AFW_ERROR_FOOTPRINT("apr_file_write()");
237  len = encoded->size;
238  rv = apr_file_write(peer_f, encoded->ptr, &len);
239  if (rv != APR_SUCCESS) goto error_peer_apr;
240  if (len != encoded->size) {
241  AFW_ERROR_FOOTPRINT("apr_file_write() wrong len");
242  goto error_peer_apr;
243  }
244 
245  /* Truncate file to length written. */
246  AFW_ERROR_FOOTPRINT("apr_file_trunc()");
247  rv = apr_file_trunc(peer_f, encoded->size);
248  if (rv != APR_SUCCESS) goto error_peer_apr;
249 
250  /* Close peer object. */
251  AFW_ERROR_FOOTPRINT("apr_file_close()");
252  rv = apr_file_close(peer_f);
253  if (rv != APR_SUCCESS) goto error_peer_apr;
254 
255  /* Return. */
256  return;
257 
258 error_peer_apr:
259  AFW_THROW_ERROR_FOOTPRINT_RV_FZ(general, apr, rv, xctx,
260  "Error detected processing adaptor %" AFW_UTF8_FMT " "
262  " file %s - %s",
263  AFW_UTF8_FMT_ARG(&adaptor->pub.adaptor_id),
264  full_peer_path_z, footprint.z);
265 }
266 
267 
268 /*
269  * Implementation of method add_entry of interface afw_adaptor_journal.
270  */
271 const afw_utf8_t *
273  const afw_adaptor_journal_t * instance,
274  const afw_adaptor_impl_request_t * impl_request,
275  const afw_object_t * entry,
276  afw_xctx_t *xctx)
277 {
279  (afw_file_internal_adaptor_session_t *)instance->session;
280  afw_file_internal_adaptor_t *adaptor = session->adaptor;
281  const afw_memory_t *encoded;
282  afw_memory_t temp_raw;
283  apr_status_t rv;
284  apr_time_exp_t now;
285  const afw_utf8_z_t *relative_entry_path_z;
286  const afw_utf8_z_t *full_entry_path_z;
287  const afw_utf8_z_t *full_entry_dir_path_z;
288  const afw_utf8_z_t *old_full_entry_path_z;
289  const afw_utf8_t *first_entry_save_path;
290  const afw_utf8_t *cursor;
291  apr_file_t *lock_f;
292  apr_file_t *entry_f;
294  apr_size_t lock_len;
295  apr_off_t offset;
296  afw_endian_big_uint64_t encoded_len_be;
297  const afw_pool_t *p = xctx->p;
298  apr_pool_t *apr_p = afw_pool_get_apr_pool(p);
299  afw_boolean_t first_entry;
300  apr_size_t len;
301  afw_error_footprint_t footprint;
302 
305  /* Encode object using content type of adaptor. */
308  adaptor->content_type, entry, NULL, xctx->p, xctx);
309  encoded_len_be = afw_endian_native_to_big_uint64(encoded->size);
310 
311  /* Determine relative path from root for this entry. */
312  rv = apr_time_exp_gmt(&now, apr_time_now());
313  if (rv != APR_SUCCESS) {
314  AFW_ERROR_FOOTPRINT("apr_time_exp_gmt()");
315  AFW_THROW_ERROR_RV_Z(general, apr, rv, "apr_time_exp_gmt() failed",
316  xctx);
317  }
318  relative_entry_path_z = afw_utf8_z_printf(p, xctx,
319  "y%04d/m%02d/d%02d/h%02d",
320  now.tm_year + 1900, now.tm_mon + 1, now.tm_mday, now.tm_hour);
321 
322  /* Open lock file creating it if needed. */
323  AFW_ERROR_FOOTPRINT("apr_file_open()");
324  rv = apr_file_open(&lock_f, adaptor->journal_lock_file_path_z,
325  APR_FOPEN_READ + APR_FOPEN_CREATE + APR_FOPEN_WRITE +
326  APR_FOPEN_BINARY, APR_FPROT_OS_DEFAULT, apr_p);
327  if (APR_STATUS_IS_ENOENT(rv)) {
328  /* Create parent directories and try again. */
329  AFW_ERROR_FOOTPRINT("apr_dir_make_recursive()");
330  rv = apr_dir_make_recursive(adaptor->journal_dir_path_z,
331  APR_FPROT_OS_DEFAULT, apr_p);
332  if (rv == APR_SUCCESS) {
333  /* Try again. */
334  AFW_ERROR_FOOTPRINT("apr_file_open()");
335  rv = apr_file_open(&lock_f, adaptor->journal_lock_file_path_z,
336  APR_FOPEN_READ + APR_FOPEN_CREATE + APR_FOPEN_WRITE +
337  APR_FOPEN_BINARY, APR_FPROT_OS_DEFAULT, apr_p);
338  }
339  }
340  if (rv != APR_SUCCESS) goto error_lock_apr;
341 
342  /*
343  * Read lock record and determine if new lock file. If it is new,
344  * save the first relative_entry_path in path_to_first_entry.
345  */
346  AFW_ERROR_FOOTPRINT("apr_file_read()");
347  lock_len = sizeof(afw_adaptor_journal_lock_t);
348  rv = apr_file_read(lock_f, &lock, &lock_len);
349  first_entry = rv == APR_EOF;
350  if (!first_entry && rv != APR_SUCCESS) goto error_lock_apr;
351  if (first_entry) {
352  first_entry_save_path = afw_utf8_printf(xctx->p, xctx,
354  "/path_to_first_journal_file",
355  AFW_UTF8_FMT_ARG(adaptor->root));
356  temp_raw.ptr = (const afw_byte_t *)relative_entry_path_z;
357  temp_raw.size = strlen(relative_entry_path_z);
358  afw_file_from_memory(first_entry_save_path, &temp_raw,
359  afw_file_mode_write, xctx);
360  }
361 
362  /* Make sure lock record was completely read. */
363  AFW_ERROR_FOOTPRINT("check lock record");
364  if (rv == APR_SUCCESS && lock_len != sizeof(afw_adaptor_journal_lock_t))
365  goto error_lock;
366 
367  /*
368  * If year, mon, day, and hour are not the same as last time, need to
369  * point current journal file last entry to new journal.
370  */
371  old_full_entry_path_z = NULL;
372  if (!first_entry &&
373  ( lock.century != now.tm_year / 100 + 19
374  || lock.year != now.tm_year % 100
375  || lock.month != now.tm_mon + 1
376  || lock.day != now.tm_mday
377  || lock.hour != now.tm_hour)
378  )
379  {
380  old_full_entry_path_z = afw_utf8_z_printf(p, xctx,
382  "/y%02d%02d/m%02d/d%02d/h%02d",
383  AFW_UTF8_FMT_ARG(adaptor->root),
384  lock.century, lock.year, lock.month, lock.day, lock.hour);
385  }
386 
387  /* Update lock struct. */
388  lock.filler = 0;
389  lock.century = (now.tm_year / 100) + 19;
390  lock.year = now.tm_year % 100;
391  lock.month = now.tm_mon + 1;
392  lock.day = now.tm_mday;
393  lock.hour = now.tm_hour;
394  lock.min = now.tm_min;
395  lock.sec = now.tm_sec;
396  lock.usec = afw_endian_native_to_big_uint32(now.tm_usec);
397 
398  /* Path to file that will hold event. */
399  full_entry_path_z = afw_utf8_z_printf(p, xctx,
401  AFW_UTF8_FMT_ARG(adaptor->root), relative_entry_path_z);
402 
403  /*
404  * If switching journal file or first entry, make sure all directories
405  * exist.
406  */
407  if (old_full_entry_path_z || first_entry) {
408  full_entry_dir_path_z = afw_utf8_z_printf(p, xctx,
410  "/y%02d%02d/m%02d/d%02d/",
411  AFW_UTF8_FMT_ARG(adaptor->root),
412  lock.century, lock.year, lock.month, lock.day);
413  AFW_ERROR_FOOTPRINT("apr_dir_make_recursive()");
414  rv = apr_dir_make_recursive(full_entry_dir_path_z,
415  APR_FPROT_OS_DEFAULT, apr_p);
416  if (rv != APR_SUCCESS) goto error_journal_apr;
417  }
418 
419  /* Open journal entry file. */
420  AFW_ERROR_FOOTPRINT("apr_file_open()");
421  rv = apr_file_open(&entry_f, full_entry_path_z,
422  APR_FOPEN_CREATE +
423  APR_FOPEN_WRITE +
424  APR_FOPEN_BINARY +
425  APR_FOPEN_APPEND,
426  APR_FPROT_OS_DEFAULT, apr_p);
427  if (rv != APR_SUCCESS) goto error_journal_apr;
428 
429  /* Determine cursor of entry. */
430  AFW_ERROR_FOOTPRINT("apr_file_seek()");
431  offset = 0;
432  rv = apr_file_seek(entry_f, APR_CUR, &offset);
433  if (rv != APR_SUCCESS) goto error_journal_apr;
434  cursor = afw_utf8_printf(xctx->p, xctx,
435  "%02d%02d%02d%02d%02d_%" AFW_INTEGER_FMT,
436  lock.century, lock.year, lock.month, lock.day, lock.hour,
437  (afw_integer_t)offset);
438 
439  /* Write entry prefix (length). */
440  AFW_ERROR_FOOTPRINT("apr_file_write()");
441  len = sizeof(encoded_len_be);
442  rv = apr_file_write(entry_f, &encoded_len_be, &len);
443  if (rv != APR_SUCCESS) goto error_journal_apr;
444 
445  /* Write entry . */
446  AFW_ERROR_FOOTPRINT("apr_file_write()");
447  len = encoded->size;
448  rv = apr_file_write(entry_f, encoded->ptr, &len);
449  if (rv != APR_SUCCESS) goto error_journal_apr;
450 
451  /* Close journal. */
452  AFW_ERROR_FOOTPRINT("apr_file_close()");
453  rv = apr_file_close(entry_f);
454  if (rv != APR_SUCCESS) goto error_journal_apr;
455 
456  /*
457  * If a switch occurred, write last record of old file with
458  * an entry length of 0 followed by relative_entry_path_z
459  * of the now new current file.
460  */
461  if (old_full_entry_path_z) {
462  /* Open old journal. */
463  AFW_ERROR_FOOTPRINT("apr_file_open()");
464  rv = apr_file_open(&entry_f, old_full_entry_path_z,
465  APR_FOPEN_WRITE +
466  APR_FOPEN_BINARY +
467  APR_FOPEN_APPEND,
468  APR_FPROT_OS_DEFAULT, apr_p);
469  if (rv != APR_SUCCESS) goto error_old_journal_apr;
470 
471  /* Write 0 to old journal prefix (length). */
472  AFW_ERROR_FOOTPRINT("apr_file_write()");
473  len = sizeof(encoded_len_be);
474  encoded_len_be.i = 0;
475  rv = apr_file_write(entry_f, &encoded_len_be, &len);
476  if (rv != APR_SUCCESS) goto error_old_journal_apr;
477 
478  /* Write relative path to new journal. */
479  AFW_ERROR_FOOTPRINT("apr_file_write()");
480  len = strlen(relative_entry_path_z);
481  rv = apr_file_write(entry_f, relative_entry_path_z, &len);
482  if (rv != APR_SUCCESS) goto error_old_journal_apr;
483 
484  /* Close old journal. */
485  AFW_ERROR_FOOTPRINT("apr_file_close()");
486  rv = apr_file_close(entry_f);
487  if (rv != APR_SUCCESS) goto error_old_journal_apr;
488  }
489 
490  /* Write the new lock struct and close so lock is released. */
491 
492  /* Seek to start of lock file. */
493  AFW_ERROR_FOOTPRINT("apr_file_seek()");
494  offset = 0;
495  rv = apr_file_seek(lock_f, APR_SET, &offset);
496  if (rv != APR_SUCCESS) goto error_lock_apr;
497 
498  /* Replace lock. */
499  AFW_ERROR_FOOTPRINT("apr_file_write()");
500  len = sizeof(lock);
501  rv = apr_file_write(lock_f, &lock, &len);
502  if (rv != APR_SUCCESS) goto error_lock_apr;
503 
504  /* Close lock file which unblocks new journal entry adds. */
505  AFW_ERROR_FOOTPRINT("apr_file_close()");
506  rv = apr_file_close(lock_f);
507  if (rv != APR_SUCCESS) goto error_lock_apr;
508 
509  /* Return cursor. */
510  return cursor;
511 
512 error_lock_apr:
513  AFW_THROW_ERROR_FOOTPRINT_RV_FZ(general, apr, rv, xctx,
514  "Error detected processing adaptor %" AFW_UTF8_FMT
515  " journal lock file %s - %s",
516  AFW_UTF8_FMT_ARG(&adaptor->pub.adaptor_id),
517  adaptor->journal_lock_file_path_z, footprint.z);
518 
519 error_lock:
520  AFW_THROW_ERROR_FOOTPRINT_FZ(general, xctx,
521  "Error detected while processing "
522  "adaptor %" AFW_UTF8_FMT " journal lock file %s - %s",
523  AFW_UTF8_FMT_ARG(&adaptor->pub.adaptor_id),
524  adaptor->journal_lock_file_path_z, footprint.z);
525 
526 error_journal_apr:
527  AFW_THROW_ERROR_FOOTPRINT_RV_FZ(general, apr, rv, xctx,
528  "Error detected processing adaptor %" AFW_UTF8_FMT
529  " journal file %s - %s",
530  AFW_UTF8_FMT_ARG(&adaptor->pub.adaptor_id),
531  full_entry_path_z, footprint.z);
532 
533 error_old_journal_apr:
534  AFW_THROW_ERROR_FOOTPRINT_RV_FZ(general, apr, rv, xctx,
535  "Error detected processing adaptor %" AFW_UTF8_FMT
536  " journal file %s - %s",
537  AFW_UTF8_FMT_ARG(&adaptor->pub.adaptor_id),
538  old_full_entry_path_z, footprint.z);
539 }
540 
541 
542 /*
543  * Implementation of method get_entry of interface afw_adaptor_journal.
544  */
545 void
547  const afw_adaptor_journal_t * instance,
548  const afw_adaptor_impl_request_t * impl_request,
550  const afw_utf8_t * consumer_id,
551  const afw_utf8_t * entry_cursor,
552  afw_size_t limit,
553  const afw_object_t * response,
554  afw_xctx_t *xctx)
555 {
557  (afw_file_internal_adaptor_session_t *)instance->session;
558  afw_file_internal_adaptor_t *adaptor = session->adaptor;
559  const afw_pool_t *p = xctx->p;
560  apr_pool_t *apr_p = afw_pool_get_apr_pool(p);
561 
562  afw_error_footprint_t footprint;
563  apr_file_t *entry_f;
564  apr_file_t *peer_f;
565  const afw_utf8_t *entry_object_id;
566  const afw_object_t *peer;
567  const afw_object_t *entry;
568  const afw_value_t *value;
569  const afw_utf8_t *currentCursor;
570  const afw_utf8_t *consumeCursor;
571  const afw_utf8_t *advanceCursor;
572  const afw_utf8_t *first_entry_save_path;
573  afw_boolean_t get_first;
574  afw_boolean_t skip_first_entry;
575  afw_boolean_t use_consumer_cursors;
576  afw_boolean_t advance_consumer_cursor;
577  afw_boolean_t use_consumer;
578  afw_boolean_t reissue;
579  afw_boolean_t check_filter;
580  afw_boolean_t open_journal;
581  afw_boolean_t applicable;
582  const afw_utf8_z_t *relative_entry_path_z;
583  afw_utf8_z_t relative_entry_path_wa_z[IMPL_RELATIVE_ENTRY_PATH_WA_Z_SIZE];
584  const afw_utf8_z_t *full_entry_path_z;
585  const afw_utf8_z_t *full_peer_path_z;
586  afw_off_t offset;
587  afw_size_t len;
588  apr_status_t rv;
589  afw_endian_big_uint64_t encoded_len_be;
590  afw_size_t encoded_len;
591  void *memory;
592  afw_memory_t buffer;
593  const afw_value_t * filter;
594  const afw_dateTime_t *now;
595 
596  /* Initialize variables. */
597  get_first = false;
598  skip_first_entry = false;
599  reissue = false;
600  use_consumer = false;
601  use_consumer_cursors = false;
602  advance_consumer_cursor = false;
603  check_filter = false;
604  applicable = false;
605  entry_object_id = NULL;
606  filter = NULL;
607  peer_f = NULL;
608  peer = NULL;
609  full_peer_path_z = NULL;
610  full_entry_path_z = NULL;
611  entry_f = NULL;
612  entry = NULL;
613 
614  /* Set variables based on options */
615  switch (option) {
616 
618  get_first = true;
619  break;
620 
622  skip_first_entry = true;
623  entry_object_id = entry_cursor;
624  break;
625 
627  use_consumer = true;
628  use_consumer_cursors = true;
629  check_filter = true;
630  break;
631 
633  skip_first_entry = true;
634  entry_object_id = entry_cursor;
635  use_consumer = true;
636  check_filter = true;
637  break;
638 
640  use_consumer = true;
641  check_filter = true;
642  advance_consumer_cursor = true;
643  break;
644 
646  entry_object_id = entry_cursor;
647  break;
648 
649  default:
650  AFW_THROW_ERROR_FZ(general, xctx,
651  "Invalid option %d passed to afw_adaptor_journal_get_entry()",
652  option);
653  };
654 
655  /* If using peer object, get it and set variables as appropriate. */
656  if (use_consumer) {
657 
658  /* Get peer object. */
659  peer = impl_open_and_retrieve_peer_object(instance, consumer_id,
660  &peer_f, &full_peer_path_z, xctx);
661 
662  /* If using consumer cursors, set variables as appropriate. */
663  if (use_consumer_cursors) {
664  currentCursor = afw_object_old_get_property_as_string(peer,
665  &afw_s_currentCursor, xctx);
666  consumeCursor = afw_object_old_get_property_as_string(peer,
667  &afw_s_consumeCursor, xctx);
668  advanceCursor = afw_object_old_get_property_as_string(peer,
669  &afw_s_advanceCursor, xctx);
670  if (consumeCursor) {
671  entry_object_id = consumeCursor;
672  check_filter = false;
673  reissue = true;
674  }
675  else if (advanceCursor) {
676  entry_object_id = advanceCursor;
677  skip_first_entry = false;
678  }
679  else if (currentCursor) {
680  entry_object_id = currentCursor;
681  skip_first_entry = true;
682  }
683  else {
684  get_first = true;
685  }
686  }
687  }
688 
689  /* If get_first, get path to first journal file. */
690  if (get_first) {
691  first_entry_save_path = afw_utf8_printf(xctx->p, xctx,
693  "/path_to_first_journal_file",
694  AFW_UTF8_FMT_ARG(adaptor->root));
695  relative_entry_path_z = afw_utf8_to_utf8_z(
697  afw_file_to_memory(first_entry_save_path, 0, p, xctx),
698  p, xctx),
699  p, xctx);
700  offset = 0;
701  }
702 
703  /* Convert entry_object_id and offset to relative journal file path. */
704  else if (entry_object_id) {
705  relative_entry_path_z = impl_object_id_to_relative_entry_path(
706  entry_object_id, relative_entry_path_wa_z, &offset, xctx);
707  if (!relative_entry_path_z) return;
708  }
709 
710  else {
711  AFW_THROW_ERROR_Z(general,
712  "get_first or entry_object_id required",
713  xctx);
714  }
715 
716  /* Loop until end or an applicable entry found. */
717  open_journal = true;
718  for (;;) {
719 
720  /* Make entry_object_id for the entry. */
721  entry_object_id = impl_relative_entry_path_to_object_id(
722  relative_entry_path_z, offset, xctx);
723 
724  /* If needed, open journal file. */
725  if (open_journal) {
726  open_journal = false;
727  full_entry_path_z = afw_utf8_z_concat(p, xctx,
728  adaptor->journal_dir_path_z,
729  relative_entry_path_z, NULL);
730  AFW_ERROR_FOOTPRINT("apr_file_open()");
731  rv = apr_file_open(&entry_f, full_entry_path_z,
732  APR_FOPEN_READ + APR_FOPEN_BINARY, APR_FPROT_OS_DEFAULT,
733  apr_p);
734  if (rv != APR_SUCCESS) goto error_journal_apr;
735  }
736 
737  /* Position to offset and read length. */
738  AFW_ERROR_FOOTPRINT("apr_file_seek()");
739  rv = apr_file_seek(entry_f, APR_SET, &offset);
740  if (rv != APR_SUCCESS) goto error_journal_apr;
741  len = sizeof(encoded_len_be);
742  AFW_ERROR_FOOTPRINT("apr_file_read()");
743  rv = apr_file_read(entry_f, &encoded_len_be, &len);
744  if (APR_STATUS_IS_EOF(rv)) break; /* Nothing applicable. */
745  if (rv != APR_SUCCESS) goto error_journal_apr;
746  AFW_ERROR_FOOTPRINT("check if encoded_len_be truncated");
747  if (len != sizeof(encoded_len_be)) goto error_journal;
749  encoded_len_be, xctx);
750  len = encoded_len;
751 
752  /* If length is 0, switch to next journal file. */
753  if (len == 0) {
754  AFW_ERROR_FOOTPRINT("apr_file_read()");
755  len = sizeof(relative_entry_path_wa_z);
756  rv = apr_file_read(entry_f, &relative_entry_path_wa_z[0], &len);
757  if (rv != APR_SUCCESS) goto error_journal_apr;
758  relative_entry_path_z = &relative_entry_path_wa_z[0];
759  AFW_ERROR_FOOTPRINT("apr_file_close()");
760  rv = apr_file_close(entry_f);
761  if (rv != APR_SUCCESS) goto error_journal_apr;
762  offset = 0;
763  open_journal = true;
764  continue;
765  }
766 
767  /* If not skipping first entry, read and process it. */
768  if (!skip_first_entry) {
769 
770  /* Read journal entry. */
771  memory = afw_xctx_calloc(len, xctx);
772  buffer.size = len;
773  buffer.ptr = memory;
774  AFW_ERROR_FOOTPRINT("apr_file_read()");
775  rv = apr_file_read(entry_f, memory, &len);
776  if (rv != APR_SUCCESS) goto error_journal_apr;
777  AFW_ERROR_FOOTPRINT("check that all of encoded entry read");
778  if (len != buffer.size) goto error_journal;
779  value = afw_content_type_raw_to_value(adaptor->content_type, &buffer,
780  consumer_id, p, xctx);
781  AFW_ERROR_FOOTPRINT("check that converted entry is an object");
782  if (!afw_value_is_object(value)) goto error_journal;
783  entry = ((const afw_value_object_t *)value)->internal;
784 
785  /* If reissue, set reissue property and mark applicable. */
786  if (reissue) {
787  applicable = true;
788  afw_object_set_property_as_boolean(response, &afw_s_reissue,
789  true, xctx);
790  }
791 
792  /* Else if check_filter, set applicable using filter. */
793  else if (check_filter) {
795  instance, entry, peer, &filter, xctx);
796  }
797 
798  /* Else entry is applicable. */
799  else {
800  applicable = true;
801  }
802 
803  /* If applicable entry, leave loop. */
804  if (applicable) break;
805  }
806 
807  /* Set offset for next loop. */
808  skip_first_entry = false;
809  offset = encoded_len + offset + sizeof(encoded_len_be);
810  };
811 
812  AFW_ERROR_FOOTPRINT("apr_file_close()");
813  rv = apr_file_close(entry_f);
814  if (rv != APR_SUCCESS) goto error_journal_apr;
815 
816 
817  /* If use_consumer, finishup. */
818  if (use_consumer) {
819 
820  /* Set peer lastContactTime to now. */
821  now = afw_dateTime_now_utc(p, xctx);
823  &afw_s_lastContactTime, now, xctx);
824 
825  /* If advance_consumer_cursor, set peer advanceCursor */
826  if (advance_consumer_cursor) {
828  &afw_s_advanceCursor, entry_object_id, xctx);
829  }
830 
831  /* If use_consumer_cursors and not reissue, update peer cursors. */
832  else if (use_consumer_cursors && !reissue) {
833 
834  /*
835  * If applicable, set currentCursor and consumeCursor and remove
836  * advanceCursor.
837  */
838  if (applicable) {
840  &afw_s_currentCursor, entry_object_id, xctx);
842  &afw_s_consumeCursor, entry_object_id, xctx);
844  &afw_s_consumeStartTime, now, xctx);
845  afw_object_remove_property(peer, &afw_s_advanceCursor,
846  xctx);
847  }
848 
849  /* If not applicable, set advance cursor to to point to eof. */
850  else {
852  &afw_s_advanceCursor, entry_object_id, xctx);
853  }
854 
855  }
856 
857  /* Replace peer object and close. */
858  impl_write_and_close_peer_object(instance, peer, peer_f,
859  full_peer_path_z, xctx);
860 
861  }
862 
863  /* Set entryCursor property. */
864  afw_object_set_property_as_string(response, &afw_s_entryCursor,
865  entry_object_id, xctx);
866 
867  /*
868  * Return entry if there is an applicable one and not
869  * advance_consumer_cursor request.
870  */
871  if (applicable && !advance_consumer_cursor) {
872  afw_object_set_property_as_object(response, &afw_s_entry, entry,
873  xctx);
874  }
875  return;
876 
877 error_journal_apr:
878  AFW_THROW_ERROR_FOOTPRINT_RV_FZ(general, apr, rv, xctx,
879  "Error detected processing adaptor %" AFW_UTF8_FMT " journal file %s - %s",
880  adaptor->pub.adaptor_id.len, adaptor->pub.adaptor_id.s,
881  full_entry_path_z, footprint.z);
882 
883 error_journal:
884  AFW_THROW_ERROR_FOOTPRINT_FZ(general, xctx,
885  "Error detected while processing "
886  "adaptor %" AFW_UTF8_FMT " journal file %s - %s",
887  AFW_UTF8_FMT_ARG(&adaptor->pub.adaptor_id),
888  full_entry_path_z, footprint.z);
889 
890 }
891 
892 
893 /*
894  * Implementation of method mark_entry_consumed of interface afw_adaptor_journal.
895  */
896 void
898  const afw_adaptor_journal_t * instance,
899  const afw_adaptor_impl_request_t * impl_request,
900  const afw_utf8_t * consumer_id,
901  const afw_utf8_t * entry_cursor,
902  afw_xctx_t *xctx)
903 {
904  const afw_object_t *peer;
905  apr_file_t *peer_f;
906  const afw_utf8_z_t *full_peer_path_z;
907  const afw_utf8_t *consume_cursor;
908  const afw_dateTime_t *now;
909 
910 
911  peer = impl_open_and_retrieve_peer_object(instance,
912  consumer_id, &peer_f,
913  &full_peer_path_z, xctx);
914 
915  consume_cursor = afw_object_old_get_property_as_string(peer,
916  &afw_s_consumeCursor, xctx);
917  if (!consume_cursor || !afw_utf8_equal(entry_cursor, consume_cursor)) {
918  AFW_THROW_ERROR_Z(general,
919  "Object id supplied is not currently being consumed", xctx);
920  }
921 
922  /* Remove consume* properties. */
923  afw_object_remove_property(peer, &afw_s_consumeCursor, xctx);
924  afw_object_remove_property(peer, &afw_s_consumeStartTime, xctx);
925 
926  /* Update lastContactTime property. */
927  now = afw_dateTime_now_utc(xctx->p, xctx);
928  afw_object_set_property_as_dateTime(peer, &afw_s_lastContactTime,
929  now, xctx);
930 
931  /* Write peer object and close. */
932  impl_write_and_close_peer_object(instance, peer, peer_f, full_peer_path_z,
933  xctx);
934 }
Interface afw_interface implementation declares.
Adaptive Framework Core Internal.
afw_adaptor_impl_is_journal_entry_applicable(const afw_adaptor_journal_t *instance, const afw_object_t *entry, const afw_object_t *consumer, const afw_value_t *const *filter, afw_xctx_t *xctx)
Determine whether a journal entry is applicable to a consumer.
void impl_afw_adaptor_journal_get_entry(const afw_adaptor_journal_t *instance, const afw_adaptor_impl_request_t *impl_request, afw_adaptor_journal_option_t option, const afw_utf8_t *consumer_id, const afw_utf8_t *entry_cursor, afw_size_t limit, const afw_object_t *response, afw_xctx_t *xctx)
const afw_utf8_t * impl_afw_adaptor_journal_add_entry(const afw_adaptor_journal_t *instance, const afw_adaptor_impl_request_t *impl_request, const afw_object_t *entry, afw_xctx_t *xctx)
void impl_afw_adaptor_journal_mark_entry_consumed(const afw_adaptor_journal_t *instance, const afw_adaptor_impl_request_t *impl_request, const afw_utf8_t *consumer_id, const afw_utf8_t *entry_cursor, afw_xctx_t *xctx)
afw_object_set_property_as_boolean(const afw_object_t *object, const afw_utf8_t *property_name, afw_boolean_t internal, afw_xctx_t *xctx)
Set property function for data type boolean values.
afw_object_set_property_as_dateTime(const afw_object_t *object, const afw_utf8_t *property_name, const afw_dateTime_t *internal, afw_xctx_t *xctx)
Set property function for data type dateTime values.
#define afw_value_is_object(A_VALUE)
Macro to determine if value is evaluated object.
afw_object_set_property_as_object(const afw_object_t *object, const afw_utf8_t *property_name, const afw_object_t *internal, afw_xctx_t *xctx)
Set property function for data type object values.
#define afw_object_old_get_property_as_string(object, property_name, xctx)
Get property function for data type string value.
afw_object_set_property_as_string(const afw_object_t *object, const afw_utf8_t *property_name, const afw_utf8_t *internal, afw_xctx_t *xctx)
Set property function for data type string values.
#define AFW_UTF8_FMT_ARG(A_STRING)
Convenience Macro for use with AFW_UTF8_FMT to specify arg.
Definition: afw_common.h:605
#define AFW_INTEGER_FMT
Format string specifier used for afw_integer_t.
Definition: afw_common.h:326
_Bool afw_boolean_t
Definition: afw_common.h:373
unsigned char afw_byte_t
A byte of memory (unsigned).
Definition: afw_common.h:208
#define AFW_UTF8_FMT
Format string specifier used for afw_utf8_t.
Definition: afw_common.h:588
afw_utf8_octet_t afw_utf8_z_t
NFC normalized UTF-8 null terminated string.
Definition: afw_common.h:523
apr_size_t afw_size_t
size_t.
Definition: afw_common.h:151
apr_int64_t afw_integer_t
typedef for big signed int.
Definition: afw_common.h:321
enum afw_adaptor_journal_option_e afw_adaptor_journal_option_t
Typedef for afw_adaptor_journal get_entry options enum.
apr_off_t afw_off_t
off_t.
Definition: afw_common.h:154
@ afw_adaptor_journal_option_get_next_for_consumer_after_cursor
afw_adaptor_journal get_entry option get_next_for_consumer_after_cursor
Definition: afw_common.h:1118
@ afw_adaptor_journal_option_get_next_for_consumer
afw_adaptor_journal get_entry option get_next_for_consumer
Definition: afw_common.h:1095
@ afw_adaptor_journal_option_advance_cursor_for_consumer
afw_adaptor_journal get_entry option advance_cursor_for_consumer
Definition: afw_common.h:1145
@ afw_adaptor_journal_option_get_next_after_cursor
afw_adaptor_journal get_entry option get_next_after_cursor
Definition: afw_common.h:1060
@ afw_adaptor_journal_option_get_by_cursor
afw_adaptor_journal get_entry option get_by_cursor
Definition: afw_common.h:1049
@ afw_adaptor_journal_option_get_first
afw_adaptor_journal get_entry option get_first
Definition: afw_common.h:1037
#define afw_content_type_raw_to_value(instance, raw, source_location, p, xctx)
Call method raw_to_value of interface afw_content_type.
#define afw_content_type_object_to_raw(instance, object, options, p, xctx)
Convert object to the raw in specified pool.
afw_size_t afw_endian_safe_big_uint64_to_native_size_t(afw_endian_big_uint64_t big, afw_xctx_t *xctx)
Safe afw_endian_big_uint64_t to native afw_size_t.
Definition: afw_endian.h:183
afw_endian_big_uint64_t afw_endian_native_to_big_uint64(afw_uint64_t native)
native afw_uint64_t to afw_endian_big_uint64_t
Definition: afw_endian.h:234
afw_endian_big_uint32_t afw_endian_native_to_big_uint32(afw_uint32_t native)
native afw_uint32_t to afw_endian_big_uint32_t
Definition: afw_endian.h:292
#define AFW_THROW_ERROR_RV_Z(code, rv_source_id, rv, message_z, xctx)
Macro used to set error and rv in xctx and throw it.
Definition: afw_error.h:301
#define AFW_ERROR_FOOTPRINT(_footprint_)
Set error footprint.
Definition: afw_error.h:171
#define AFW_THROW_ERROR_FZ(code, xctx, format_z,...)
Macro used to set error and 0 rv in xctx and throw it.
Definition: afw_error.h:319
#define AFW_THROW_ERROR_FOOTPRINT_FZ(code, xctx, format_z,...)
Macro used to set error and 0 rv in xctx using line number in footprint then throw it.
Definition: afw_error.h:428
#define AFW_THROW_ERROR_Z(code, message_z, xctx)
Macro used to set error and 0 rv in xctx and throw it.
Definition: afw_error.h:283
#define AFW_THROW_ERROR_FOOTPRINT_RV_FZ(code, rv_source_id, rv, xctx, format_z,...)
Macro used to set error and rv in xctx using line number in footprint then throw it.
Definition: afw_error.h:449
afw_file_from_memory(const afw_utf8_t *file_path, const afw_memory_t *from_memory, afw_file_mode_t mode, afw_xctx_t *xctx)
Write a file from a memory.
Definition: afw_file.c:136
afw_file_to_memory(const afw_utf8_t *file_path, apr_size_t file_size, const afw_pool_t *p, afw_xctx_t *xctx)
Read a file into a memory in a specifed pool.
Definition: afw_file.c:71
afw_object_remove_property(const afw_object_t *instance, const afw_utf8_t *property_name, afw_xctx_t *xctx)
Remove a property from object.
Definition: afw_object.c:35
#define AFW_OBJECT_Q_OBJECT_TYPE_ID_PROVISIONING_PEER
Quoted object type id for Provisioning Peer object.
Definition: afw_object.h:66
#define AFW_OBJECT_Q_OBJECT_TYPE_ID_JOURNAL_ENTRY
Quoted object type id for Journal Entry object.
Definition: afw_object.h:60
#define afw_pool_get_apr_pool(instance)
Call method get_apr_pool of interface afw_pool.
afw_size_t afw_safe_cast_off_to_size(afw_off_t off, afw_xctx_t *xctx)
Safely cast afw_off_t to afw_size_t.
Definition: afw_safe_cast.h:48
afw_dateTime_now_utc(const afw_pool_t *p, afw_xctx_t *xctx)
Get now time as dateTime in specified pool.
Definition: afw_time.c:593
const afw_utf8_t * afw_utf8_from_raw(const afw_memory_t *raw, const afw_pool_t *p, afw_xctx_t *xctx)
Convert raw to a utf-8 NFC normalizing if necessary in specified pool.
Definition: afw_utf8.h:199
afw_boolean_t afw_utf8_equal(const afw_utf8_t *s1, const afw_utf8_t *s2)
Check to see if a string equals another string.
const afw_utf8_z_t * afw_utf8_z_printf(const afw_pool_t *p, afw_xctx_t *xctx, const afw_utf8_z_t *format_z,...)
Definition: afw_utf8.h:854
const afw_utf8_z_t * afw_utf8_to_utf8_z(const afw_utf8_t *string, const afw_pool_t *p, afw_xctx_t *xctx)
Convert utf8 to utf8_z in specified pool.
Definition: afw_utf8.h:529
const afw_utf8_z_t * afw_utf8_z_concat(const afw_pool_t *p, afw_xctx_t *xctx,...)
afw_utf8_printf(const afw_pool_t *p, afw_xctx_t *xctx, const afw_utf8_z_t *format,...)
Create a utf-8 string using a c format string in specified pool.
Definition: afw_utf8.c:459
#define afw_xctx_calloc(size, xctx)
Macro to allocate cleared memory in xctx's pool.
Definition: afw_xctx.h:185
Internal request info used by afw_adaptor_impl*() functions.
Interface afw_adaptor_journal_inf_s struct.
Interface afw_adaptor_journal public struct.
date, time, and time zone.
Definition: afw_common.h:1760
error footprint
Definition: afw_error.h:154
Struct for memory pointer and size.
Definition: afw_common.h:505
Interface afw_object public struct.
Interface afw_pool public struct.
NFC normalized UTF-8 string.
Definition: afw_common.h:545
struct for data type object values.
Interface afw_value public struct.
Interface afw_xctx public struct.
32-bit unsigned big endian integer.
Definition: afw_endian.h:127
64-bit unsigned big endian integer.
Definition: afw_endian.h:103