-
Notifications
You must be signed in to change notification settings - Fork 0
/
wooter.cpp
2246 lines (2041 loc) · 86.5 KB
/
wooter.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#include <iostream>
#include <fstream>
#include <errno.h> // access, ENOENT
#include <sys/stat.h> // for mkdir
#include <string>
#include <sstream> // for concatenating ints and strings
#include <vector>
#include <thread>
#include <mutex>
// includes provided from code from Stevens Unix Network Programming, vol 1.
#include <stdio.h> // perror, snprintf
#include <stdlib.h> // exit
#include <unistd.h> // close, write
#include <string.h> // strlen
#include <strings.h> // bzero
#include <time.h> // time, ctime
#include <sys/socket.h> // socket, AF_INET, SOCK_STREAM,
// bind, listen, accept
#include <netinet/in.h> // servaddr, INADDR_ANY, htons
#include <arpa/inet.h> // inet_addr
// defines provided from code from Stevens Unix Network Programming, vol 1.
#define MAXLINE 4096
#define BUFFSIZE 8192
#define SA struct sockaddr
#define LISTENQ 1024
// original defines
#define PORT_PRIM 13090
#define NUM_BKUPS 10 // this number includes the primary
#define MOVED_CONNFD -10
using namespace std;
// FORWARD DECLARATION of function needed for Functor operator()()
void handle_php(int connfd, char* cmd, int cmd_size);
// GLOBAL VARIABLE for ~Functor()
bool am_primary=false;
// CLASSES DEFINED
struct User{
public:
string username, password, full_name, email;
unsigned int id, age, followers, followees, num_woots;
char free_bit;
};
class FileLock{
public:
FileLock() = delete;
FileLock(const string& file_path) : file_name(file_path) {
cout << "FileLock created for: " << file_path << endl;
file_mut=new mutex;
}
FileLock(FileLock& copied)= delete;
FileLock(FileLock&& moved) : file_name(moved.file_name) {
cout << "FileLock being moved on: " << file_name << endl;
file_mut=moved.file_mut;
moved.file_mut=nullptr;
}
~FileLock(){
cout << "FileLock being destroyed on: " << file_name << endl;
if(file_mut!=nullptr){ delete file_mut; }
}
const string& get_name() const { return file_name; }
mutex& mut(){ return (*file_mut); }
private:
string file_name;
mutex* file_mut;
};
class Functor{
public:
Functor(int connfd, char* cmd, int size_of_msg) : connfd(connfd), clean_up(cmd) {
cout << "Functor()" << endl;
operator()(connfd, cmd, size_of_msg);
}
Functor(const Functor&)=delete;
Functor(Functor&& other){ // never called, but just to be safe
cout << "Functor(const Functor&&)" << endl;
connfd=other.connfd;
clean_up=other.clean_up;
other.connfd=MOVED_CONNFD;
other.clean_up=nullptr;
}
void operator()(int connfd, char* cmd, int size_of_msg){
cout << "Functor::operator()()" << endl;
handle_php(connfd, cmd, size_of_msg);
}
~Functor(){
cout << "~Functor()" << endl;
if(am_primary && connfd!=MOVED_CONNFD) { close(connfd); }
delete clean_up;
}
private:
int connfd;
char* clean_up;
};
// GLOBAL VARIABLES
mutex lock_distributor; // used by mt_open to handle lock distribution
vector<FileLock> file_locks;
vector<int> rm_connfds(NUM_BKUPS); // initializes each item to 0
// GLOBAL CONSTANTS
const int MIN_USER_LEN=8, MAX_USER_LEN=24+1, MIN_PWD_LEN=10, MAX_PWD_LEN=32+1, MAX_ID_LEN=10+1; // at most 10 digits for the # of users
const int MAX_FNM_LEN=25, MAX_LNM_LEN=25+1, MAX_EML_LEN=100+1, MAX_NUM_WOOTS=10; // at most 10 digits for the # of woots per user
// age is hardcoded to 4, 3 digits + 1 pad, so constant here
const int MAX_FLR_LEN=10+1, MAX_FLE_LEN=10+1; // at most 10 digits for the # of followees
const int MAX_ULINE_LEN=(24)+1+(32)+1+(10)+1+(25)+1+(25)+1+(4)+1+(100)+1+(10)+1+(10)+1+(10)+1;
//const int MAX_ULINE_LEN=MAX_USER_LEN+MAX_PWD_LEN+MAX_ID_LEN+MAX_FNM_LEN+1+MAX_LNM_LEN+4+MAX_EML_LEN+MAX_FLR_LEN+MAX_FLE_LEN+1+MAX_NUM_WOOTS+1;
const int USERS_PER_FILE=10, FID_HDR_LEN=5, FLRS_PER_LINE=10, MAX_FLR_LINE_LEN=(MAX_ID_LEN*FLRS_PER_LINE)+1;
const char MY_DELIMITER=' '; // while this is a global, the code will only function if this is a whitespace character
const int WOOTS_PER_LINE=10, MAX_WOOT_LEN=100, MAX_WOOT_TMSTP=22, MAX_WOOT_LINE=(10)*(100+1+22+1);
const int MAX_WOOT_LINE_LEN=WOOTS_PER_LINE*(MAX_WOOT_LEN+1+MAX_WOOT_TMSTP+1);
const string ROOT_PATH="/var/www/html";
string FILE_PATH="/var/www/html";
const int MSG_SIZE=560;
void net_connection(char** argv);
int main(int argc, char **argv) {
net_connection(argv);
return 0;
}
// REPLICATION HANDLING FUNCTIONS
int listen_socket(int& listenfd, int& connfd, int port); // it's a network function, but needed here
int rm_socket(int& rm_connfd, int port){
/*
rm_socket() creates a socket using the passed in rm_connfd and connects it to the specified host (localhost)
* at the passed in port #. This is similar to listen_socket(), only using connect() instead of bind() and listen().
*
* RETURN: The error code returned by connect().
From Stevens Unix Network Programming, vol 1.
Minor modifications by John Sterling
Further minor modifications by Robert Ryszewski
*/
struct sockaddr_in rm_addr;
// 1. Create the socket
if ((rm_connfd = socket(AF_INET, SOCK_STREAM, 0)) == -1) {
perror("Unable to create a socket");
exit(1);
}
// 2. Set up the sockaddr_in
memset(&rm_addr, 0, sizeof(rm_addr));
rm_addr.sin_family = AF_INET; // Specify the family
// use any network card present
rm_addr.sin_addr.s_addr = inet_addr("127.0.0.1");
rm_addr.sin_port = htons(port);
int success=connect(rm_connfd , (struct sockaddr *)&rm_addr , sizeof(rm_addr));
if ( success < 0 ) {
perror("connect failed. Error");
}
return success;
}
void write_rms(char* cmd){
// write_rms() simply takes in a command and relays it to all the RMs to execute it on their own data sets.
cout << "ATTEMPT: to write a command to all connected RMs" << endl;
for(size_t i=0; i<rm_connfds.size(); i++){
if(rm_connfds[i]!=0){ // default initialized value; as far as we know it's still validly connected
int bytes_sent=write(rm_connfds[i], cmd, MSG_SIZE); // write to RM's listen socket
cout << "SENT " << bytes_sent << " to RM at PORT # " << PORT_PRIM+i << "." << endl;
}
}
}
void handle_being_polled(int& connfd, int& listenfd){
// handle_being_polled() handles receiving a "poll" cmd, whether this RM is the primary or not.
cout << "RECEIVED: poll cmd" << endl;
close(connfd);
if(!am_primary){ // if you're a new RM, and you were waiting for the primary, and got a poll instead
// you have to wait again for the primary
fprintf(stderr, "Ready to connect!\n");
if ((connfd = accept(listenfd, (SA *) NULL, NULL)) == -1) {
perror("accept failed"); // inexplicable error
}
fprintf(stderr, "Connected\n");
}
}
void handle_new_rm(const char* cmd, int& connfd){
/*
handle_new_rm() handles adding a new RM, that has notified the primary of its existence, to the
* list of all active RM connections.
*/
stringstream port_ss;
port_ss.str(string(cmd));
string port_str, garbage;
port_ss >> garbage >> port_str;
close(connfd); // close the current connection to the RM, which is solely for notification purposes
int rm_connfd; // open a socket to the RM's listen socket
int success=rm_socket(rm_connfd, stoi(port_str) );
if(success >= 0) { // if connected to RM successfully, notify them/don't just let nothing be sent
garbage="SUCCESSFUL CONNECTION TO PRIMARY";
garbage.resize(MSG_SIZE);
write(rm_connfd, garbage.c_str(), MSG_SIZE);
// if you had a previous connection to an RM on that port/address, close it as it is no longer valid
if(rm_connfds[stoi(port_str)-PORT_PRIM] !=0) { close(rm_connfds[stoi(port_str)-PORT_PRIM]); }
cout << "SUCCESS: Adding rm #" << port_str << endl;
rm_connfds[stoi(port_str)-PORT_PRIM]=rm_connfd;
}
else { close(rm_connfd); } // seemingly impossible, as only get notified once RM has a listen socket set up
}
void connect_rms(){
/*
connect_rms() is called only by the primary RM and attempts to establish connections
* to all possible RM hosts (within the fixed # of NUM_BKUPS), and appends them to the
* global vector of RM connections if successful.
*/
cout << "ATTEMPT: as primary to connect to RMs" << endl;
for( size_t i=1; i<NUM_BKUPS; i++){
int rm_connfd=-1;
cout << "ATTEMPT: to connect to RM at port #" << PORT_PRIM+i << endl;
int success=rm_socket(rm_connfd, PORT_PRIM+i);
if(success >= 0) { // able to reach the host at that connection
cout << "SUCCESS: to connect to RM at port #" << PORT_PRIM+i << endl;
rm_connfds[i]=rm_connfd; // update our global list of RM connfds
}
else {
cout << "FAILURE: to connect to RM at port #" << PORT_PRIM+i << endl;
close(rm_connfd);
}
}
}
int decide_status(int& connfd, int& listenfd, int my_port){
/*
decide_status() handles appropriately setting up an RM based on whether or not that RM is the primary.
* For the primary: a listen socket is created, and connections to all available RMs are created.
* Any RMs created in the future will notify the primary, and then be added to the list of RM connections.
* For the other RMs: a local directory is created for their replicated data, and they block the primary
* while they copy over the primary's data exactly. They then notify the primary, and accept a connection
* to the primary from which they will read replicated commands.
* RETURN: Error status of 0 for no problems, -1 for a failure somewhere.
*/
cout << "ATTEMPT: to decide status" << endl;
int prim_connfd;
if(am_primary){ // connect to rms and wait for client requests
listen_socket(listenfd, connfd, my_port);
connect_rms();
}
else { // connect to primary and wait for commands
// adjust your file path accordingly, since only one RM can occupy the local /var/www/html
FILE_PATH=ROOT_PATH+"/rm_"+to_string(my_port-PORT_PRIM);
int result=access(FILE_PATH.c_str(), F_OK);
if( (result < 0) && (errno == ENOENT) ) { // if directory does not exist
cerr << "The directory: " << FILE_PATH << " does not exist" << endl;
mkdir(FILE_PATH.c_str(), S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH);
cerr << "Have created dir! " << endl;
} // else the directory already exists
// this would simply be omitted for RMs on different machines
// connect to primary, create a listen socket, then tell primary you are a new RM, wait for its response
if( rm_socket(prim_connfd, PORT_PRIM) == -1){
perror("rm_socket failed");
return -1;
}
if( listen_socket(listenfd, connfd, my_port) == -1) { // listen_socket() already handles perror
close(prim_connfd); // free up that connection
return -1;
}
// only tell primary to add you when your listen socket is in fact set up, so no race condition
string notify_prim="new_rm "+to_string(my_port);
notify_prim.resize(MSG_SIZE);
int bytes_sent=write(prim_connfd, notify_prim.c_str(), MSG_SIZE);
if(bytes_sent!=MSG_SIZE){
cerr << "FAILURE: on new RM notifying primary" << endl; // seems non-fatal
}
// while you have the primary occupied and no client requests can come in, duplicate the files
string cmd="cp -r "+ ROOT_PATH + "/fids " + ROOT_PATH + "/users " + ROOT_PATH + "/flwes " + ROOT_PATH + "/flwrs " + ROOT_PATH + "/woots " + FILE_PATH;
system(cmd.c_str()); // on a multiple machine system, this should not be a blocking call
// and would thus run into all sorts of race condition problems/problems with freshness/validity of data
close(prim_connfd); // free up that port
cout << "SUCCESS: wrote new_rm # " << notify_prim << " to primary" << endl;
fprintf(stderr, "Ready to connect!\n");
if ((connfd = accept(listenfd, (SA *) NULL, NULL)) == -1) {
perror("accept failed");
return -1;
}
fprintf(stderr, "Connected\n");
}
return 0; // either primary or RM set up okay
}
int poll_rm_ports(bool attempted_primary=false){
/*
poll_rm_ports() finds and returns the first available RM port #/address out of all the known # of backup RMs/addresses.
* If no available ports are found, the program exits as this program has no means to become an RM to the primary.
* NOTE: All non-available ports are sent a message of "poll" to notify them this connection is temporary and for polling purposes only.
*/
cout << "ATTEMPT: to poll ports" << endl;
for( size_t i=0; i<NUM_BKUPS; i++){
if(i==0 && attempted_primary) { continue; } // don't try to poll the primary port if you just failed doing that
int rm_connfd;
cout << "ATTEMPT: to poll port #: " << PORT_PRIM+i << endl;
int success=rm_socket(rm_connfd, PORT_PRIM+i);
if(success < 0) { // could not reach host on that port, assume the port is available
if(PORT_PRIM+i==PORT_PRIM) { am_primary=true; }
cout << "SUCCESS: port # " << PORT_PRIM+i << " polled as available " << endl;
close(rm_connfd); // close this connection, since only wanted to find available port #
return PORT_PRIM+i;
}
else { // could reach host on that port, let them know this connection is just for polling
string just_poll="poll";
just_poll.resize(MSG_SIZE);
write(rm_connfd, just_poll.c_str(), MSG_SIZE);
close(rm_connfd);
}
}
cerr << "FAILURE: All RM ports taken, none left available" << endl;
exit(42);
}
// MULTI-THREADING HANDLER FUNCTION
mutex& mt_open(string path){
/*
mt_open() takes in a string absolute file path to some file, and locks the FileLock object for that file.
* If the FileLock object does not exist, it is created.
* This allows every existing user handling function to function as before, and just first call mt_open() to lock the resources it needs.
*
Distributing locks here is a critical region in itself.
A condition variable is unnecessary as the "condition" is simply whether somebody is in the region or not.
* Thus a simple big lock suffices. A cond_var however would do away with the busy waiting of a lock.
* Thus a cond_var may be used in future revisions.
A big lock is necessary here so that there are no race conditions in checking if a given FileLock
* has already been created, and double creating. This does introduce a bottleneck of sequentialism
* but it allows for creating a lock per file object, and thus is far less sequential than a single big lock for EVERY file access.
*/
unique_lock<mutex> global_ul(lock_distributor); // lock this critical region
cout << "Thread: " << this_thread::get_id() << "\t\twaiting on\tlock:\t" << path << endl;
size_t lock_pos=0;
bool found=false;
while( lock_pos < file_locks.size() ){ // try to find the position of the desired FielLock
if(file_locks[lock_pos].get_name()==path){
found=true;
break;
}
lock_pos++;
}
if(!found){ // if not found, create it on the heap to avoid needing to deal with move construction
file_locks.push_back(FileLock(path));
cout << "Thread: " << this_thread::get_id() << "\t\tobtained\tlock:\t" << path << endl;
int this_lock_pos=file_locks.size()-1; // before unlocking, to prevent race conditions
global_ul.unlock(); // MUST unlock this region before attempting to lock your FileLock, so that a single FileLock attempt does not hold up EVERY call to mt_open
return file_locks[this_lock_pos].mut();// that would simply lead to deadlocks. this way only threads waiting on the same FileLock object wait on each other
} else{ // it was found
cout << "Thread: " << this_thread::get_id() << "\t\tobtained\tlock:\t" << path << endl;
global_ul.unlock(); // unlock this critical region, action is done, again must do so to avoid deadlock
return file_locks[lock_pos].mut();
}
// serious error if ever get here
}
// USER HANDLING FUNCTIONS
void flush_user(stringstream& file_name_ss, User* user_obj, int offset){
/*
flush_user() writes the attributes of $user_obj one by one into the file $file_handle
at the offset $offset. Each attribute is padded to meet its maximum field size
so that each field, line, and user file are of a fixed, constant size.
*/
fstream user_file;
user_file.open(file_name_ss.str().c_str());
if(!user_file){
cerr << "Error opening: " << file_name_ss.str() << endl;
exit(6);
}
user_file.seekp(offset, user_file.beg);
stringstream buf_ss;
// set a fixed width stringstream to pad each field, then write to file
buf_ss.width(MAX_USER_LEN);
buf_ss << user_obj->username;
user_file << buf_ss.str();
buf_ss.str("");
buf_ss.width(MAX_PWD_LEN);
buf_ss << user_obj->password;
user_file << buf_ss.str();
buf_ss.str("");
buf_ss.width(MAX_ID_LEN);
buf_ss << user_obj->id;
user_file << buf_ss.str();
buf_ss.str("");
buf_ss.width(MAX_FNM_LEN+MAX_LNM_LEN+1);
buf_ss << user_obj->full_name;
user_file << buf_ss.str();
buf_ss.str("");
buf_ss.width(4);
buf_ss << user_obj->age;
user_file << buf_ss.str();
buf_ss.str("");
buf_ss.width(MAX_EML_LEN);
buf_ss << user_obj->email;
user_file << buf_ss.str();
buf_ss.str("");
buf_ss.width(MAX_FLR_LEN);
buf_ss << user_obj->followers;
user_file << buf_ss.str();
buf_ss.str("");
buf_ss.width(MAX_FLE_LEN);
buf_ss << user_obj->followees;
user_file << buf_ss.str();
buf_ss.str("");
buf_ss.width(1);
buf_ss << user_obj->free_bit;
user_file << buf_ss.str();
buf_ss.str("");
buf_ss.width(MAX_NUM_WOOTS);
buf_ss << user_obj->num_woots;
user_file << buf_ss.str();
buf_ss.str("");
buf_ss << endl;
user_file << buf_ss.str();
user_file.close();
}
void fids_decrease(int line_num){
/*
fids_decrease() takes a line number, which corresponds to a user file u_$line_num, and decreases
the number of free spots/ids (fids) in that file. If $line_num is larger than the # of lines,
a new line is written of $USERS_PER_FILE-1.
*/
string fids_path = FILE_PATH +"/fids/fids.txt";
fstream fids;
fids.open(fids_path.c_str());
if(!fids){
cerr << "Error opening: " << fids_path << endl;
exit(10);
}
// use line_num to loop via <<
string current_free_ch;
for(int i=0; i<line_num+1; i++){
current_free_ch.clear();
fids >> current_free_ch;
}
// get amount to seek by, decrement current_free
stringstream generic_ss; // will be reused for ints then chars
generic_ss << USERS_PER_FILE-1;
int num_digits=generic_ss.str().length();
int current_free=atoi(current_free_ch.c_str());
if(current_free==0){
current_free=USERS_PER_FILE-1;
fids.clear();
}
else{ current_free--; }
// seek where needed to write back current_free, do so
fids.seekp(line_num*(num_digits+1), fids.beg);
generic_ss.str("");
generic_ss << current_free;
fids << generic_ss.str() << endl;
fids.close();
}
void create_file(string file_type, string file_row_s, string file_column_s){
/*
create_file() creates a file of the given $file_type and uses $file_row and $file_column
to satisfy a naming convention. User files are one dimensional, from u_0.txt to u_n.txt.
Followers, followees, and woot files use the row dimension to mirror the user_files,
and have a column dimension to allow for a fixed # objects per file per user.
*/
int file_row=atoi(file_row_s.c_str());
int file_column=atoi(file_column_s.c_str());
stringstream file_name_ss;
string start_path=FILE_PATH+"/";
file_name_ss << start_path;
// check if any of the dirs needed dont exist, if so, make them
vector<string> dirs;
dirs.push_back("users");
dirs.push_back("fids");
dirs.push_back("flwrs");
dirs.push_back("flwes");
dirs.push_back("woots");
for(size_t i=0; i < dirs.size(); i++){
string test_path=dirs[i];
int result=access((start_path+test_path).c_str(), F_OK);
if( (result < 0) && (errno == ENOENT) ) { // if directory does not exist
cerr << "The directory: " << start_path << test_path << " does not exist" << endl;
mkdir((start_path+test_path).c_str(), S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH);
cerr << "Have created dir! " << endl;
} // else the directory already exists
}
// choose file name format using the file_type parameter
if(file_type=="users"){ file_name_ss << "users/u_" << file_row << ".txt"; }
else if(file_type=="fids") { file_name_ss << "fids/fids.txt"; }
else if(file_type=="followers") { file_name_ss << "flwrs/fr_r" << file_row << "c" << file_column << ".txt"; }
else if(file_type=="followees") { file_name_ss << "flwes/fe_r" << file_row << "c" << file_column << ".txt"; }
else if(file_type=="woots") { file_name_ss << "woots/w_r" << file_row << "c" << file_column << ".txt"; }
// if valid file type passed in, create the file with empty lines of that type
int result=access((file_name_ss.str()).c_str(), F_OK);
if( (result == 0) ) { // if file does exist
cerr << "Could not create: " << file_name_ss.str() << " as it already exists." << endl;
exit(8);
}
bool valid_type = (file_type=="users") || (file_type=="fids") || (file_type=="woots") || (file_type=="followers") || (file_type=="followees");
if(valid_type){
stringstream one_line_ss;
int num_lines=0;
if(file_type=="users"){
one_line_ss << string(MAX_ULINE_LEN-1, MY_DELIMITER);
num_lines=USERS_PER_FILE;
}
else if(file_type=="fids"){
one_line_ss << USERS_PER_FILE; // that many users slots are free when the file is made
num_lines=1; // fids_decrease is called after making fids file/flushing user
}
else if(file_type=="followers" || file_type=="followees"){
one_line_ss << string(MAX_ID_LEN*FLRS_PER_LINE, MY_DELIMITER);
num_lines=USERS_PER_FILE;
}
else if(file_type=="woots"){
one_line_ss << string(MAX_WOOT_LINE_LEN, MY_DELIMITER);
num_lines=USERS_PER_FILE;
}
one_line_ss << endl;
ofstream new_file;
new_file.open(file_name_ss.str().c_str());
if(!new_file){
cerr << "Error opening: " << file_name_ss.str() << endl;
exit(9);
}
for(int lines=0; lines<num_lines; lines++){ new_file << one_line_ss.str(); } // fill the file with empty line
new_file.close();
}
}
User* create_new_user(User* user_obj){
/*
create_new_user() takes in a user object assembled from the registration page and fully
integrates that user into the system. This is done by finding it a free user ID and by
writing the user's attributes to the first available spot in the user file base.
* The user object is then returned for convenience.
* This function assumes that read_user() has been checked first before calling,
* and does not further check if the user is in the database already.
Returns a NULL pointer if failed somehow.
*/
int current_line=0;
string fids_path = FILE_PATH+"/fids/fids.txt";
unique_lock<mutex> fids_ul(mt_open(fids_path)); // OBTAIN THE FIDS LOCK
int result=access(fids_path.c_str(), F_OK);
if( (result < 0) && (errno == ENOENT) ) { // if file does not exist
create_file("fids","0","0");
}
else{ // if file does exist
fstream fids; // gave up the "i" so that wouldnt have to template mt_open
fids.open(fids_path.c_str());
if(!fids){
cerr << "Error opening: " << fids_path << endl;
fids.close();
fids_ul.unlock(); // RELEASE LOCK EXPLICITLY
exit(5);
}
int num_free=0;
fids >> num_free;
while(num_free == 0){ // find the first user file # with free slots for new users
if(!fids.eof()){
current_line++;
fids >> num_free;
}
else { break; }
}
fids.close();
} // current line is now which user file you want
// if the user file does not exist, make it, flush the user, update fids
stringstream user_path_ss;
user_path_ss << FILE_PATH << "/users/u_" << current_line << ".txt";
unique_lock<mutex> uf_ul(mt_open(user_path_ss.str())); // OBTAIN USER FILE LOCK
result=access(user_path_ss.str().c_str(), F_OK);
if( (result < 0) && (errno == ENOENT) ) {
int user_id=current_line*USERS_PER_FILE;
stringstream current_line_str_ss;
current_line_str_ss << current_line;
create_file("users", current_line_str_ss.str(), " ");
user_obj->id=user_id;
user_obj->followers=0;
user_obj->followees=0;
user_obj->num_woots=0;
user_obj->free_bit='n';
fids_decrease((user_obj->id/USERS_PER_FILE)); // this is why still need the fids_lock
flush_user(user_path_ss, user_obj, 0); // 0 since this is a new file being made
uf_ul.unlock(); // RELEASE LOCK EXPLICITLY
fids_ul.unlock(); // RELEASE LOCK EXPLICITLY
return user_obj;
}
else { // if the file exists, it has free space for new users, since we checked the fids
// if file exists, you still have the lock on the file, proceed
fstream user_file;
user_file.open(user_path_ss.str().c_str());
if(!user_file){
cerr << "Error opening: " << user_path_ss.str() << endl;
uf_ul.unlock(); // RELEASE LOCK EXPLICITLY
fids_ul.unlock(); // RELEASE LOCK EXPLICITLY
exit(7);
}
int current_id=0;
char current_bit='\0';
int bit_pos=MAX_ULINE_LEN-MAX_NUM_WOOTS-1-1;
user_file.seekg(bit_pos, user_file.cur);
while(current_id<USERS_PER_FILE){ // scan until you find the first free spot
current_bit='\0';
user_file >> current_bit;
if(current_bit=='\000'){ // fstream wont read whitespace
user_obj->id=current_id+(USERS_PER_FILE*current_line);
user_obj->followers=0;
user_obj->followees=0;
user_obj->num_woots=0;
user_obj->free_bit='n';
user_file.close();
fids_decrease(user_obj->id / USERS_PER_FILE); // protected by the fids lock
flush_user(user_path_ss, user_obj, current_id*MAX_ULINE_LEN); // protected by the overarching mt_open on this user file
uf_ul.unlock(); // RELEASE LOCK EXPLICITLY
fids_ul.unlock(); // RELEASE LOCK EXPLICITLY
return user_obj;
}
user_file.seekg(MAX_ULINE_LEN-1, user_file.cur);
current_id+=1;
}
}
uf_ul.unlock(); // RELEASE LOCK EXPLICITLY
fids_ul.unlock(); // RELEASE LOCK EXPLICITLY
return nullptr;
}
User* unflush_user(string file_name, int offset, bool unsure=false){
/*
unflush_user() reads the attributes of $user_obj one by one from the file $file_handle
at the given file position. Each attribute's padding is removed upon reading from file.
It assumes a valid file_name and offset, does not error check for misuse.
* The caller should use it properly.
* A null pointer is returned if user could not be unflushed.
* A valid return of a user object is one allocated on the heap. Caller is responsible for freeing up this memory.
*/
// THREAD-SAFETY HANDLED in other functions whenever they call this function
if(offset > MAX_ULINE_LEN*USERS_PER_FILE) {
cerr << "Error: tried to read line # " << offset/MAX_ULINE_LEN << " when only " << USERS_PER_FILE << " user lines per file " << endl;
exit(10);
}
int result=access(file_name.c_str(), F_OK);
if( (result<0)&&(errno=ENOENT) ){
return NULL; // user's file does not exist, user does not exist
}
ifstream user_base;
user_base.open(file_name.c_str());
if(!user_base && !unsure){
cerr << "Error opening: " << file_name << endl;
exit(11);
}
else if(!user_base && unsure){ // if doing read_id and id might be arbitrary
cerr << "Error: the file " << file_name << " does not exist in the user base" << endl;
return NULL;
}
user_base.seekg(offset, user_base.beg);
// create a new user object to pull out of the file database
User* user_obj=new User;
user_base >> user_obj->username >> user_obj->password >> user_obj->id;
string first_name, last_name;
user_base >> first_name >> last_name;
user_obj->full_name=first_name + ' ' + last_name;
user_base >> user_obj->age >> user_obj->email >> user_obj->followers
>> user_obj->followees >> user_obj->free_bit >> user_obj->num_woots;
user_base.close();
return user_obj;
}
User* read_user(string username){
/*
read_user() takes in a username string, then finds it in the user file base and returns its user object.
If the user does not exist in the user file base, a void return takes place.
* A null pointer is returned if somehow fails.
Othewise a new user object pointer is returned, which the caller
is responsible for freeing up the memory of after use.
*/
int file_number=0;
while(file_number>-1){ // search every user file
stringstream file_name_ss;
file_name_ss << FILE_PATH << "/users/u_" << file_number << ".txt";
unique_lock<mutex> file_lock(mt_open(file_name_ss.str())); // OBTAIN LOCK
int result=access((file_name_ss.str()).c_str(), F_OK);
if( (result < 0) && (errno == ENOENT) ) { // if file does not exist
cerr << "The file: " << file_name_ss.str() << " does not exist" << endl;
file_lock.unlock(); // RELEASE LOCK EXPLICITLY
return NULL; // return nothing if not found
}
else { // file exists
fstream user_base;
user_base.open(file_name_ss.str().c_str());
if(!user_base){
cerr << " The file: " << file_name_ss.str() << " could not be opened." << endl;
file_lock.unlock(); // RELEASE LOCK EXPLICITLY
exit(12);
}
while(user_base.tellg() != -1){
string read_username;
user_base >> read_username;
if(read_username == username){ // if it matches what we want, build and return that user object
user_base.seekg(-MAX_USER_LEN, user_base.cur);
int offset=user_base.tellg();
User* user_obj=unflush_user(file_name_ss.str(), offset); // this could be null
file_lock.unlock(); // RELEASE LOCK EXPLICITLY
return user_obj;
}
else { user_base.seekg(MAX_ULINE_LEN-MAX_USER_LEN, user_base.cur); }
} // if not a match, read next user
user_base.close();
}
file_lock.unlock(); // RELEASE LOCK EXPLICITLY
file_number++;
}
return NULL; // if inexplicably gets here, file lock is already unlocked
}
User* read_id(int id_num){
/*
read_id() takes in a username id #, then finds it in the user file base and returns its user object.
If the user does not exist in the user file base, a null pointer return takes place.
This function allows for O(1) look up for followers/followees instead of an O(n) scan.
The user object returned is allocated on the heap, and the function caller is responsible
for deallocating this memory.
*/
stringstream file_name_ss;
file_name_ss << FILE_PATH << "/users/u_" << id_num/USERS_PER_FILE << ".txt";
unique_lock<mutex> uf_lock(mt_open(file_name_ss.str())); // OBTAIN LOCK
User* result=unflush_user(file_name_ss.str(), MAX_ULINE_LEN*(id_num % USERS_PER_FILE), true); // "true" to indicate unsure if valid ID
uf_lock.unlock(); // RELEASE LOCK EXPLICITLY
return result;
}
string already_following(User* home_user, int followee_id, char type='e'){
/*
already_following() takes in a logged in $home_user, the ID of the user they want to follow,
and returns an array of the file name, file line #, and line position where that ID already
exists in the user's list of followees. If the logged in user is not currently following that ID,
then a void return takes place.
*/
int user_row=home_user->id/USERS_PER_FILE;
int current_column=0;
stringstream file_name_ss;
file_name_ss << FILE_PATH << "/flw" << type << "s/f" << type << "_r" << user_row << "c" << current_column << ".txt";
unique_lock<mutex> cult_file(mt_open(file_name_ss.str())); // OBTAIN LOCK ON CULT FILE
unsigned int followees_read=0;
int result=access(file_name_ss.str().c_str(), F_OK);
unsigned int total_people=type=='r'?home_user->followers:home_user->followees;
while( !((result < 0)&&(errno==ENOENT)) && followees_read < total_people){
ifstream following;
following.open(file_name_ss.str().c_str());
int line_num=home_user->id % USERS_PER_FILE;
following.seekg(line_num*(1+FLRS_PER_LINE*MAX_ID_LEN));
int flwrs_in_cur_file=FLRS_PER_LINE;
string line;
stringstream sub_line_ss;
int last_token=-1;
for(int current_fle=0; current_fle < flwrs_in_cur_file; current_fle++){ // flwrs follow is just a line of IDs separated by spaces
int current_id=-1;
char * buffer = new char [MAX_ID_LEN];
following.read(buffer,MAX_ID_LEN);
string temp(buffer);
temp.resize(MAX_ID_LEN);
sub_line_ss.str(temp);
string empty(MAX_ID_LEN, MY_DELIMITER);
delete[] buffer;
buffer=NULL;
if(sub_line_ss.str() != empty){
sub_line_ss.clear(); // after every read into int
sub_line_ss >> current_id; // only read in an id if its not an empty line
string debug(sub_line_ss.str());
if(current_id!=last_token){ // once hit last id in line, the read repeats the last token
followees_read++;
last_token=current_id;
}
else { break; } // done with the line
if(current_id==followee_id) {
stringstream triplet_ss;
triplet_ss << file_name_ss.str() << ' ' << line_num << ' ' << current_fle;
cult_file.unlock(); // RELEASE LOCK EXPLICITLY
return triplet_ss.str();
}
} // reads by a fixed amount due to holes left by unfollow()ing
}
following.close();
cult_file.unlock(); // RELEASE LOCK EXPLICITLY
current_column++;
file_name_ss.str("");
file_name_ss << FILE_PATH << "/flw" << type << "s/f" << type << "_r" << user_row << "c" << current_column << ".txt" ;
cult_file=unique_lock<mutex>(mt_open(file_name_ss.str())); // OBTAIN NEXT LOCK
result=access(file_name_ss.str().c_str(), F_OK);
} // if exit the loop, ran into a non existent file or already read the entire cult and found nothing
cult_file.unlock(); // RELEASE LOCK EXPLICITLY
return ""; // should check on receiving end that not empty string
}
string already_followed_by(User* other_user, int follower_id){
/*
already_followed_by() takes in a target user $other_user, the ID of the logged in user wanting to follow them,
and returns an array of the file name, file line #, and line position where that ID already
exists in the target user's list of followers. If the target user is not currently being followed by the logged
in user, then a void return takes place.
*/
return already_following(other_user, follower_id, 'r');
}
string first_free(User* user_obj, char type){
/*
first_free() takes in a user and a char indicating either 'r' for "followers" or 'e' for "followees".
It then returns the first location where a new follower/followee's ID can be recorded.
This can indicate one of three cases: 1) a previous follower has unfollowed, leaving a gap,
2) the user has no free gaps and a new file must be created, 3) or the file already
exists but the user has simply not expanded into filling it.
*/
int user_row=user_obj->id/USERS_PER_FILE;
int current_column=0;
stringstream file_name_ss;
file_name_ss << FILE_PATH << "/flw" << type << "s/f" << type << "_r" << user_row << "c" << current_column << ".txt";
int group=-1;
if(type=='e'){ group=user_obj->followees; }
else if(type=='r') { group=user_obj->followers; }
else {
cerr << "An invalid type has been passed to first_free. Exiting" << endl;
exit(13);
}
int followers_read=0;
int line_num=user_obj->id%USERS_PER_FILE;
unique_lock<mutex>cult_file(mt_open(file_name_ss.str())); // OBTAIN THE CULT FILE LOCK
int result=access(file_name_ss.str().c_str(), F_OK);
while( !((result < 0)&&(errno==ENOENT)) && followers_read < group){ // a gap has been found
ifstream following;
following.open(file_name_ss.str().c_str());
if(!following){
cerr << "Could not open file: " << file_name_ss.str() << endl;
cult_file.unlock(); // RELEASE LOCK EXPLICITLY
exit(14);
}
following.seekg(line_num*(1+FLRS_PER_LINE*MAX_ID_LEN));
int flwes_in_cur_file=FLRS_PER_LINE; // no && (followers_read < group) since want to finish line and get free spot here
for(int current_fle=0; (current_fle < flwes_in_cur_file); current_fle++){
char * buffer = new char [MAX_ID_LEN];
following.read(buffer,MAX_ID_LEN);
stringstream current_id_ss;
string cut_junk(buffer);
cut_junk.resize(MAX_ID_LEN);
current_id_ss.str(cut_junk);
int num=-1;
string temp(current_id_ss.str());
current_id_ss >> num;
delete[] buffer;
buffer=NULL;// no longer needed
string empty(MAX_ID_LEN, MY_DELIMITER);
if(current_id_ss.str() == empty){
current_id_ss.str("");
current_id_ss.clear(); // every time it outputs to a number
current_id_ss << file_name_ss.str();
current_id_ss << " ";
current_id_ss << line_num;
current_id_ss << " ";
current_id_ss << current_fle;
current_id_ss << " " << current_column;
string debug(current_id_ss.str());
following.close();
cult_file.unlock(); // RELEASE LOCK EXPLICITLY
return current_id_ss.str();
}
else{ followers_read++; }
}
following.close();
current_column++;
file_name_ss.str("");
file_name_ss << FILE_PATH << "/flw" << type << "s/f" << type << "_r" << user_row << "c" << current_column << ".txt";
cult_file.unlock(); // RELEASE LOCK EXPLICITLY
cult_file=unique_lock<mutex>(mt_open(file_name_ss.str())); // OBTAIN NEXT CULT FILE LOCK
result=access(file_name_ss.str().c_str(), F_OK);
}
if( !((result < 0)&&(errno==ENOENT)) ) { // file exists
stringstream current_id_ss;
current_id_ss << file_name_ss.str() << ' ' << line_num << ' ' << 0 << ' ' << current_column;
cult_file.unlock(); // RELEASE LOCK EXPLICITLY
return current_id_ss.str(); // return the new file and spot to start filling
}
else{
stringstream current_id_ss; // file doesn't exist, user must make it
current_id_ss << -1 << ' ' << line_num << ' ' << -1 << ' ' << current_column;
cult_file.unlock(); // RELEASE LOCK EXPLICITLY
return current_id_ss.str();
}
}
void gen_follow_func(User* home_user, User* other_user, char op, char group){
/*
gen_follow_func() takes in two user pointers, a character specifying the operation, and a character specifying the group.
The op can either be "f" for follow or "u" for unfollow. The group can be "1" to indicate home_user performing the op
to other_user (following/unfollowing them), or "2" to indicate other_user reciprocating the op from home_user (by
being followed or unfollowed by home_user in the database).
* The subsequent operation is then performed.
*/
if( !(op=='f' || op=='u') || !(group=='1' || group=='2') ){
cerr << "Invalid options passed to gen_follow_func: " << op << "," << group << endl;
exit(16);
} // if invalid options passed, exit
int user_row=-1;
if(group=='1') { user_row=home_user->id/USERS_PER_FILE; }
else if (group=='2') { user_row=other_user->id/USERS_PER_FILE; }
stringstream user_fnm_ss;
user_fnm_ss << FILE_PATH << "/users/u_" << user_row << ".txt";
unique_lock<mutex> user_lock(mt_open(user_fnm_ss.str())); // OBTAIN USER FILE LOCK (for flushing)
// already_following will lock each cult follow on a as needed basis
// following the ordering from cult file 0 to cult file n to avoid deadlocks
int followee_id=other_user->id;
int follower_id=home_user->id;
stringstream capture_ss;
// catch the already_following or already_followed_by relationship
string returned;
if(group=='1'){ returned=already_following(home_user, followee_id); }
else if(group=='2') { returned=already_followed_by(other_user, follower_id); }
capture_ss.str(returned);
string fnm;
int lnm=-2, flx_num=-2;
if( !(capture_ss.str().empty()) ){ capture_ss >> fnm >> lnm >> flx_num; }
// if the relationship doesn't exist, and you want to make it
// or the relationship does exist, and you want to destroy it
if( ((flx_num==-2)&&(op=='f')) || ((flx_num!=-2)&&(op=='u')) ){
string file_name(fnm); // if unfollow(), you want the file where the relationship is
// if follow, line below sets this to the proper file name
int line_num=lnm, col_num=-2, col_file_num=-2;
// if unfollow(), you want the line of the relationship; if follow, lines below set this appropriately
// if follow is called, find first free spot for either relationship and set col_num to that
if(op=='f'){ // create a new file if necessary
stringstream capture_ss_2;
if(group=='1'){ capture_ss_2.str(first_free(home_user, 'e')); }
else if(group=='2') { capture_ss_2.str(first_free(other_user, 'r')); }
capture_ss_2 >> file_name >> line_num >> col_num >> col_file_num;
// file does not exist yet and no free spots found; must make new file
if(atoi(file_name.c_str())==-1){
int row_val=-2;
char type;
if(group=='1'){
row_val=home_user->id/USERS_PER_FILE;
type='e';
}
else if(group=='2') {
row_val=other_user->id/USERS_PER_FILE;
type='r';
}
// choose the name based on if home_user following other_user or other_user being followed by home_user
stringstream file_name_ss;
file_name_ss << FILE_PATH << "/flw" << type << "s/f" << type << "_r" << row_val << "c" << col_file_num << ".txt";
file_name=file_name_ss.str();
// here only because defined create_file() to take strings
stringstream conv;
conv << row_val << " " << col_file_num;
string rv, cfn;
conv >> rv >> cfn;
// create the necessary file
string file_type;
if(group=='1') { file_type="followees"; }
if(group=='2') { file_type="followers"; }