diff --git a/PendingReleaseNotes b/PendingReleaseNotes index 1fec78e7c2ffd..a9880942b5a7e 100644 --- a/PendingReleaseNotes +++ b/PendingReleaseNotes @@ -4,3 +4,18 @@ v0.67 * The output of 'ceph status --format=json' or 'ceph -s --format=json' has changed to return status information in a more structured and usable format. + +* The 'ceph pg dump_stuck [threshold]' command used to require a + --threshold or -t prefix to the threshold argument, but now does + not. + +* Many more ceph commands now output formatted information; select + with '--format=', where can be 'json', 'json-pretty', + 'xml', or 'xml-pretty'. + +* ceph-rest-api, a wrapper around ceph_rest_api.py, can be used to start + up a test single-threaded HTTP server that provides access to cluster + information and administration in very similar ways to the ceph + commandline tool. ceph_rest_api.py can be used as a WSGI application + for deployment in a more-capable web server. See ceph-rest-api.8 + for more. diff --git a/qa/workunits/cephtool/test.sh b/qa/workunits/cephtool/test.sh index 9dd093de687dc..d759d675276ed 100755 --- a/qa/workunits/cephtool/test.sh +++ b/qa/workunits/cephtool/test.sh @@ -79,10 +79,8 @@ if ! grep "$mymsg" /tmp/$$; then fi kill $wpid -ceph mds cluster_down --no-log-to-stderr 2>&1 | grep "marked mdsmap DOWN" -expect_false ceph mds cluster_down -ceph mds cluster_up --no-log-to-stderr 2>&1 | grep "unmarked mdsmap DOWN" -expect_false ceph mds cluster_up +ceph mds cluster_down +ceph mds cluster_up ceph mds compat rm_incompat 4 ceph mds compat rm_incompat 4 @@ -167,9 +165,20 @@ ceph osd getmaxosd | grep 'max_osd = 10' ceph osd setmaxosd $save ceph osd getmaxosd | grep "max_osd = $save" +for id in `ceph osd ls` ; do + ceph tell osd.$id version +done + id=`ceph osd create` ceph osd lost $id --yes-i-really-mean-it ceph osd rm $id + +uuid=`uuidgen` +id=`ceph osd create $uuid` +id2=`ceph osd create $uuid` +[ "$id" = "$id2" ] +ceph osd rm $id + ceph osd ls ceph osd lspools | grep data ceph osd map data foo | grep 'pool.*data.*object.*foo.*pg.*up.*acting' @@ -191,11 +200,6 @@ ceph osd pool delete data3 data3 --yes-i-really-really-mean-it ceph osd stat | grep up, -for id in `ceph osd ls` ; do - ceph tell osd.$id version -done - - ceph pg debug unfound_objects_exist ceph pg debug degraded_pgs_exist ceph pg deep-scrub 0.0 diff --git a/src/auth/cephx/CephxKeyServer.h b/src/auth/cephx/CephxKeyServer.h index 905eb71fe1724..bc30fef640b16 100644 --- a/src/auth/cephx/CephxKeyServer.h +++ b/src/auth/cephx/CephxKeyServer.h @@ -268,6 +268,9 @@ class KeyServer : public KeyStore { map::const_iterator b = data.secrets_begin(); return (b != data.secrets_end()); } + int get_num_secrets() { + return data.secrets.size(); + } /*void add_rotating_secret(uint32_t service_id, ExpiringCryptoKey& key) { Mutex::Locker l(lock); diff --git a/src/ceph-disk b/src/ceph-disk index 35e4592ec4d76..db988b0d5e346 100755 --- a/src/ceph-disk +++ b/src/ceph-disk @@ -198,69 +198,95 @@ def maybe_mkdir(*a, **kw): raise +# a device "name" is something like +# sdb +# cciss!c0d1 +def get_dev_name(path): + """ + get device name from path. e.g., /dev/sda -> sdas, /dev/cciss/c0d1 -> cciss!c0d1 + """ + assert path.startswith('/dev/') + base = path[5:] + return base.replace('/', '!') + +# a device "path" is something like +# /dev/sdb +# /dev/cciss/c0d1 +def get_dev_path(name): + """ + get a path (/dev/...) from a name (cciss!c0d1) + """ + return '/dev/' + name.replace('!', '/') + +def get_dev_relpath(name): + """ + get a relative path to /dev from a name (cciss!c0d1) + """ + return name.replace('!', '/') + + +def get_partition_dev(dev, pnum): + """ + get the device name for a partition + + assume that partitions are named like the base dev, with a number, and optionally + some intervening characters (like 'p'). e.g., + + sda 1 -> sda1 + cciss/c0d1 1 -> cciss!c0d1p1 + """ + name = get_dev_name(os.path.realpath(dev)) + partname = None + for f in os.listdir(os.path.join('/sys/block', name)): + if f.startswith(name) and f.endswith(str(pnum)): + # we want the shortest name that starts with the base name and ends with the partition number + if not partname or len(f) < len(partname): + partname = f + if partname: + return get_dev_path(partname) + else: + raise Error('partition %d for %s does not appear to exist' % (pnum, dev)) + def list_all_partitions(): """ Return a list of devices and partitions """ dev_part_list = {} - for name in os.listdir('/dev/disk/by-path'): - target = os.readlink(os.path.join('/dev/disk/by-path', name)) - dev = target.split('/')[-1] - #print "name %s target %s dev %s" % (name, target, dev) - (baser) = re.search('(.*)-part\d+$', name) - if baser is not None: - basename = baser.group(1) - #print 'basename %s' % basename - base = os.readlink(os.path.join('/dev/disk/by-path', basename)).split('/')[-1] - if base not in dev_part_list: - dev_part_list[base] = [] - dev_part_list[base].append(dev) - else: - if dev not in dev_part_list: - dev_part_list[dev] = [] + for name in os.listdir('/sys/block'): + if not os.path.exists(os.path.join('/sys/block', name, 'device')): + continue + dev_part_list[name] = list_partitions(name) return dev_part_list - -def list_partitions(disk): +def list_partitions(basename): """ - Return a list of partitions on the given device + Return a list of partitions on the given device name """ - disk = os.path.realpath(disk) - assert not is_partition(disk) - assert disk.startswith('/dev/') - base = disk.split('/')[-1] partitions = [] - for name in os.listdir(os.path.join('/sys/block', base)): - if name.startswith(base): - partitions.append('/dev/' + name) + for name in os.listdir(os.path.join('/sys/block', basename)): + if name.startswith(basename): + partitions.append(name) return partitions def is_partition(dev): """ - Check whether a given device is a partition or a full disk. + Check whether a given device path is a partition or a full disk. """ dev = os.path.realpath(dev) if not stat.S_ISBLK(os.lstat(dev).st_mode): raise Error('not a block device', dev) - # we can't tell just from the name of the device if it is a - # partition or not. look in the by-path dir and see if the - # referring symlink ends in -partNNN. - name = dev.split('/')[-1] - for name in os.listdir('/dev/disk/by-path'): - target = os.readlink(os.path.join('/dev/disk/by-path', name)) - cdev = target.split('/')[-1] - if '/dev/' + cdev != dev: - continue - (baser) = re.search('(.*)-part\d+$', name) - if baser is not None: + name = get_dev_name(dev) + if os.path.exists(os.path.join('/sys/block', name)): + return False + + # make sure it is a partition of something else + for basename in os.listdir('/sys/block'): + if os.path.exists(os.path.join('/sys/block', basename, name)): return True - else: - return False - # hrm, don't know... - return False + raise Error('not a disk or partition', dev) def is_mounted(dev): @@ -288,7 +314,7 @@ def is_held(dev): """ assert os.path.exists(dev) dev = os.path.realpath(dev) - base = dev.split('/')[-1] + base = get_dev_name(dev) # full disk? directory = '/sys/block/{base}/holders'.format(base=base) @@ -320,7 +346,9 @@ def verify_not_in_use(dev): if holders: raise Error('Device is in use by a device-mapper mapping (dm-crypt?)' % dev, ','.join(holders)) else: - for partition in list_partitions(dev): + basename = get_dev_name(os.path.realpath(dev)) + for partname in list_partitions(basename): + partition = get_dev_path(partname) if is_mounted(partition): raise Error('Device is mounted', partition) holders = is_held(partition) @@ -762,16 +790,6 @@ def zap(dev): raise Error(e) -def get_udev_version(): - version = _check_output( - args=[ - 'udevadm', - '--version', - ], - ) - return int(version) - - def prepare_journal_dev( data, journal, @@ -840,30 +858,9 @@ def prepare_journal_dev( ], ) - if get_udev_version() >= 172: - journal_symlink = '/dev/disk/by-partuuid/{journal_uuid}'.format( - journal_uuid=journal_uuid, - ) - else: - # udev prior to version 172 doesn't create by-partuuid directory - # use by-path symlink instead. This is the third symlink returned - # by udevadm when queried. - LOG.debug('Using alternate persistant name for journal symlink') - symlinks = _check_output( - args=[ - 'udevadm', - 'info', - '--query=symlink', - '--name={name}'.format(name=os.path.basename(journal)), - ], - ) - journal_symlink = None - for udev_line in symlinks.split(): - if 'by-path' in udev_line: - journal_symlink = '/dev/{symlink}-part{num}'.format(symlink=str(udev_line), num=num) - break - if not journal_symlink: - raise Error('Unable to get device by path from udev') + journal_symlink = '/dev/disk/by-partuuid/{journal_uuid}'.format( + journal_uuid=journal_uuid, + ) journal_dmcrypt = None if journal_dm_keypath: @@ -1042,7 +1039,7 @@ def prepare_dev( except subprocess.CalledProcessError as e: raise Error(e) - rawdev = '{data}1'.format(data=data) + rawdev = get_partition_dev(data, 1) dev = None if osd_dm_keypath: @@ -2009,7 +2006,7 @@ def is_suppressed(path): try: if not disk.startswith('/dev/') or not stat.S_ISBLK(os.lstat(path).st_mode): return False - base = disk.split('/')[-1] + base = get_dev_name(disk) while len(base): if os.path.exists(SUPPRESS_PREFIX + base): return True @@ -2023,7 +2020,7 @@ def set_suppress(path): raise Error('does not exist', path) if not stat.S_ISBLK(os.lstat(path).st_mode): raise Error('not a block device', path) - base = disk.split('/')[-1] + base = get_dev_name(disk) with file(SUPPRESS_PREFIX + base, 'w') as f: pass @@ -2036,7 +2033,7 @@ def unset_suppress(path): if not stat.S_ISBLK(os.lstat(path).st_mode): raise Error('not a block device', path) assert disk.startswith('/dev/') - base = disk.split('/')[-1] + base = get_dev_name(disk) fn = SUPPRESS_PREFIX + base if not os.path.exists(fn): diff --git a/src/ceph.in b/src/ceph.in index 10a1faa205ea4..8c1f473066781 100755 --- a/src/ceph.in +++ b/src/ceph.in @@ -142,8 +142,6 @@ def parse_cmdargs(args=None, target=''): parser.add_argument('-f', '--format', choices=['json', 'json-pretty', 'xml', 'xml-pretty', 'plain'], dest='output_format') - # for pg dump_stuck - parser.add_argument('--threshold', type=int, help='number of seconds for a pg to be considered stuck for pg dump_stuck') # returns a Namespace with the parsed args, and a list of all extras parsed_args, extras = parser.parse_known_args(args) @@ -683,8 +681,6 @@ def main(): compat = True if parsed_args.output_format: childargs.extend(['--format', parsed_args.output_format]) - if parsed_args.threshold: - childargs.extend(['--threshold', parsed_args.threshold]) ret, outbuf, outs = send_command(cluster_handle, target, childargs, inbuf) elif ret: @@ -699,6 +695,15 @@ def main(): ret, outbuf, outs = new_style_command(parsed_args, childargs, target, sigdict, inbuf, verbose) + # debug tool: send any successful command *again* to + # verify that it is idempotent. + if not ret and 'CEPH_CLI_TEST_DUP_COMMAND' in os.environ: + ret, outbuf, outs = new_style_command(parsed_args, childargs, target, + sigdict, inbuf, verbose) + if ret < 0: + ret = -ret + print >> sys.stderr, prefix + 'Second attempt of previously successful command failed with {0}: {1}'.format(errno.errorcode[ret], outs) + if ret < 0: ret = -ret print >> sys.stderr, prefix + 'Error {0}: {1}'.format(errno.errorcode[ret], outs) diff --git a/src/ceph_mon.cc b/src/ceph_mon.cc index 7705bdbfb32d7..6ac22ba20e5c8 100644 --- a/src/ceph_mon.cc +++ b/src/ceph_mon.cc @@ -76,7 +76,8 @@ int obtain_monmap(MonitorDBStore &store, bufferlist &bl) } } - if (store.exists("mon_sync", "in_sync")) { + if (store.exists("mon_sync", "in_sync") + || store.exists("mon_sync", "force_sync")) { dout(10) << __func__ << " detected aborted sync" << dendl; if (store.exists("mon_sync", "latest_monmap")) { int err = store.get("mon_sync", "latest_monmap", bl); diff --git a/src/client/Client.cc b/src/client/Client.cc index ae7ddf65db485..eb7502c1530c9 100644 --- a/src/client/Client.cc +++ b/src/client/Client.cc @@ -1882,7 +1882,7 @@ void Client::handle_mds_map(MMDSMap* m) int newstate = mdsmap->get_state(p->first); if (!mdsmap->is_up(p->first) || mdsmap->get_inst(p->first) != p->second->inst) { - messenger->mark_down(p->second->inst.addr); + messenger->mark_down(p->second->con); if (mdsmap->is_up(p->first)) p->second->inst = mdsmap->get_inst(p->first); } else if (oldstate == newstate) diff --git a/src/common/config_opts.h b/src/common/config_opts.h index a959b4db40144..defb71ee514c8 100644 --- a/src/common/config_opts.h +++ b/src/common/config_opts.h @@ -143,6 +143,7 @@ OPTION(mon_osd_down_out_interval, OPT_INT, 300) // seconds OPTION(mon_osd_down_out_subtree_limit, OPT_STR, "rack") // smallest crush unit/type that we will not automatically mark out OPTION(mon_osd_min_up_ratio, OPT_DOUBLE, .3) // min osds required to be up to mark things down OPTION(mon_osd_min_in_ratio, OPT_DOUBLE, .3) // min osds required to be in to mark things out +OPTION(mon_osd_max_op_age, OPT_DOUBLE, 32) // max op age before we get concerned (make it a power of 2) OPTION(mon_stat_smooth_intervals, OPT_INT, 2) // smooth stats over last N PGMap maps OPTION(mon_lease, OPT_FLOAT, 5) // lease interval OPTION(mon_lease_renew_interval, OPT_FLOAT, 3) // on leader, to renew the lease @@ -386,13 +387,13 @@ OPTION(osd_pool_default_min_size, OPT_INT, 0) // 0 means no specific default; c OPTION(osd_pool_default_pg_num, OPT_INT, 8) // number of PGs for new pools. Configure in global or mon section of ceph.conf OPTION(osd_pool_default_pgp_num, OPT_INT, 8) // number of PGs for placement purposes. Should be equal to pg_num OPTION(osd_pool_default_flags, OPT_INT, 0) // default flags for new pools -// default flags for new pools -OPTION(osd_pool_default_flag_hashpspool, OPT_BOOL, true) +OPTION(osd_pool_default_flag_hashpspool, OPT_BOOL, false) // use new pg hashing to prevent pool/pg overlap OPTION(osd_map_dedup, OPT_BOOL, true) OPTION(osd_map_cache_size, OPT_INT, 500) OPTION(osd_map_message_max, OPT_INT, 100) // max maps per MOSDMap message OPTION(osd_map_share_max_epochs, OPT_INT, 100) // cap on # of inc maps we send to peers, clients OPTION(osd_op_threads, OPT_INT, 2) // 0 == no threading +OPTION(osd_peering_wq_batch_size, OPT_U64, 20) OPTION(osd_op_pq_max_tokens_per_priority, OPT_U64, 4194304) OPTION(osd_op_pq_min_cost, OPT_U64, 65536) OPTION(osd_disk_threads, OPT_INT, 1) @@ -461,6 +462,7 @@ OPTION(osd_debug_drop_pg_create_duration, OPT_INT, 1) OPTION(osd_debug_drop_op_probability, OPT_DOUBLE, 0) // probability of stalling/dropping a client op OPTION(osd_debug_op_order, OPT_BOOL, false) OPTION(osd_debug_verify_snaps_on_info, OPT_BOOL, false) +OPTION(osd_debug_verify_stray_on_activate, OPT_BOOL, false) OPTION(osd_debug_skip_full_check_in_backfill_reservation, OPT_BOOL, false) OPTION(osd_op_history_size, OPT_U32, 20) // Max number of completed ops to track OPTION(osd_op_history_duration, OPT_U32, 600) // Oldest completed op to track diff --git a/src/crush/CrushWrapper.cc b/src/crush/CrushWrapper.cc index 8a9addfb6c280..e96e6123aaba1 100644 --- a/src/crush/CrushWrapper.cc +++ b/src/crush/CrushWrapper.cc @@ -333,6 +333,9 @@ int CrushWrapper::insert_item(CephContext *cct, int item, float weight, string n ldout(cct, 5) << "insert_item item " << item << " weight " << weight << " name " << name << " loc " << loc << dendl; + if (!is_valid_crush_name(name)) + return -EINVAL; + if (name_exists(name)) { if (get_item_id(name) != item) { ldout(cct, 10) << "device name '" << name << "' already exists as id " @@ -473,6 +476,10 @@ int CrushWrapper::create_or_move_item(CephContext *cct, int item, float weight, { int ret = 0; int old_iweight; + + if (!is_valid_crush_name(name)) + return -EINVAL; + if (check_item_loc(cct, item, loc, &old_iweight)) { ldout(cct, 5) << "create_or_move_item " << item << " already at " << loc << dendl; } else { @@ -497,6 +504,9 @@ int CrushWrapper::update_item(CephContext *cct, int item, float weight, string n << " name " << name << " loc " << loc << dendl; int ret = 0; + if (!is_valid_crush_name(name)) + return -EINVAL; + // compare quantized (fixed-point integer) weights! int iweight = (int)(weight * (float)0x10000); int old_iweight; @@ -1109,3 +1119,20 @@ void CrushWrapper::generate_test_instances(list& o) o.push_back(new CrushWrapper); // fixme } + + +bool CrushWrapper::is_valid_crush_name(const string& s) +{ + if (s.empty()) + return false; + for (string::const_iterator p = s.begin(); p != s.end(); ++p) { + if (!(*p == '-') && + !(*p == '_') && + !(*p == '.') && + !(*p >= '0' && *p <= '9') && + !(*p >= 'A' && *p <= 'Z') && + !(*p >= 'a' && *p <= 'z')) + return false; + } + return true; +} diff --git a/src/crush/CrushWrapper.h b/src/crush/CrushWrapper.h index f5a88c8bdd555..3d07a281956ed 100644 --- a/src/crush/CrushWrapper.h +++ b/src/crush/CrushWrapper.h @@ -200,10 +200,13 @@ class CrushWrapper { return p->second.c_str(); return 0; } - void set_item_name(int i, const string& name) { + int set_item_name(int i, const string& name) { + if (!is_valid_crush_name(name)) + return -EINVAL; name_map[i] = name; if (have_rmaps) name_rmap[name] = i; + return 0; } // rule names @@ -790,6 +793,9 @@ class CrushWrapper { void dump_rules(Formatter *f) const; void list_rules(Formatter *f) const; static void generate_test_instances(list& o); + + + static bool is_valid_crush_name(const string& s); }; WRITE_CLASS_ENCODER(CrushWrapper) diff --git a/src/include/ceph_features.h b/src/include/ceph_features.h index f72e1028898b2..362a459bde683 100644 --- a/src/include/ceph_features.h +++ b/src/include/ceph_features.h @@ -40,6 +40,28 @@ #define CEPH_FEATURE_MON_SCRUB (1ULL<<33) #define CEPH_FEATURE_OSD_PACKED_RECOVERY (1ULL<<34) +/* + * The introduction of CEPH_FEATURE_OSD_SNAPMAPPER caused the feature + * vector to evaluate to 64 bit ~0. To cope, we designate 1ULL << 63 + * to mean 33 bit ~0, and introduce a helper below to do the + * translation. + * + * This was introduced by commit + * 9ea02b84104045c2ffd7e7f4e7af512953855ecd v0.58-657-g9ea02b8 + * and fixed by commit + * 4255b5c2fb54ae40c53284b3ab700fdfc7e61748 v0.65-263-g4255b5c + */ +#define CEPH_FEATURE_RESERVED (1ULL<<63) + +static inline unsigned long long ceph_sanitize_features(unsigned long long f) { + if (f & CEPH_FEATURE_RESERVED) { + /* everything through OSD_SNAPMAPPER */ + return 0x1ffffffffull; + } else { + return f; + } +} + /* * Features supported. Should be everything above. */ diff --git a/src/init-ceph.in b/src/init-ceph.in index 0ed906be98768..b0ed353f8e7fa 100644 --- a/src/init-ceph.in +++ b/src/init-ceph.in @@ -431,7 +431,7 @@ for name in $what; do done # activate latent osds? -if [ "$command" = "start" ]; then +if [ "$command" = "start" -a "$BINDIR" != "." ]; then if [ "$*" = "" ] || echo $* | grep -q ^osd\$ ; then ceph-disk activate-all fi diff --git a/src/mds/Server.cc b/src/mds/Server.cc index 38dad1948a457..466d481845659 100644 --- a/src/mds/Server.cc +++ b/src/mds/Server.cc @@ -2345,7 +2345,7 @@ void Server::handle_client_lookup_ino(MDRequest *mdr) dout(10) << "reply to lookup_ino " << *in << dendl; MClientReply *reply = new MClientReply(req, 0); - reply_request(mdr, reply, in, in->get_parent_dn()); + reply_request(mdr, reply, in, NULL); } void Server::_lookup_ino_2(MDRequest *mdr, int r) diff --git a/src/messages/MClientReconnect.h b/src/messages/MClientReconnect.h index 21f0021f31f41..4e2839cab695a 100644 --- a/src/messages/MClientReconnect.h +++ b/src/messages/MClientReconnect.h @@ -53,6 +53,7 @@ class MClientReconnect : public Message { } void encode_payload(uint64_t features) { + data.clear(); if (features & CEPH_FEATURE_MDSENC) { ::encode(caps, data); } else if (features & CEPH_FEATURE_FLOCK) { diff --git a/src/mon/AuthMonitor.cc b/src/mon/AuthMonitor.cc index 9f8c2ea260d81..629451b5eac7f 100644 --- a/src/mon/AuthMonitor.cc +++ b/src/mon/AuthMonitor.cc @@ -897,8 +897,8 @@ bool AuthMonitor::prepare_command(MMonCommand *m) KeyServerData::Incremental auth_inc; auth_inc.name = entity; if (!mon->key_server.contains(auth_inc.name)) { - ss << "couldn't find entry " << entity; - err = -ENOENT; + ss << "entity " << entity << " does not exist"; + err = 0; goto done; } auth_inc.op = KeyServerData::AUTH_INC_DEL; @@ -997,3 +997,13 @@ void AuthMonitor::upgrade_format() propose_pending(); } } + +void AuthMonitor::dump_info(Formatter *f) +{ + /*** WARNING: do not include any privileged information here! ***/ + f->open_object_section("auth"); + f->dump_unsigned("first_committed", get_first_committed()); + f->dump_unsigned("last_committed", get_last_committed()); + f->dump_unsigned("num_secrets", mon->key_server.get_num_secrets()); + f->close_section(); +} diff --git a/src/mon/AuthMonitor.h b/src/mon/AuthMonitor.h index f538e737856a6..30d567a77caf6 100644 --- a/src/mon/AuthMonitor.h +++ b/src/mon/AuthMonitor.h @@ -157,6 +157,8 @@ class AuthMonitor : public PaxosService { void pre_auth(MAuth *m); void tick(); // check state, take actions + + void dump_info(Formatter *f); }; diff --git a/src/mon/Elector.cc b/src/mon/Elector.cc index 4b1221d2c316c..1650a997b2d09 100644 --- a/src/mon/Elector.cc +++ b/src/mon/Elector.cc @@ -55,7 +55,8 @@ void Elector::bump_epoch(epoch_t e) MonitorDBStore::Transaction t; t.put(Monitor::MONITOR_NAME, "election_epoch", epoch); mon->store->apply_transaction(t); - mon->reset(); + + mon->join_election(); // clear up some state electing_me = false; @@ -198,7 +199,6 @@ void Elector::handle_propose(MMonElection *m) dout(5) << " got propose from old epoch, quorum is " << mon->quorum << ", " << m->get_source() << " must have just started" << dendl; // we may be active; make sure we reset things in the monitor appropriately. - mon->reset(); mon->start_election(); } else { dout(5) << " ignoring old propose" << dendl; @@ -215,7 +215,6 @@ void Elector::handle_propose(MMonElection *m) } else { // wait, i should win! if (!electing_me) { - mon->reset(); mon->start_election(); } } diff --git a/src/mon/MDSMonitor.cc b/src/mon/MDSMonitor.cc index 169834ab0e311..172f0f1e3dbee 100644 --- a/src/mon/MDSMonitor.cc +++ b/src/mon/MDSMonitor.cc @@ -529,6 +529,8 @@ void MDSMonitor::dump_info(Formatter *f) f->open_object_section("mdsmap"); mdsmap.dump(f); f->close_section(); + + f->dump_unsigned("mdsmap_first_committed", get_first_committed()); } bool MDSMonitor::preprocess_command(MMonCommand *m) @@ -884,23 +886,19 @@ bool MDSMonitor::prepare_command(MMonCommand *m) } else if (prefix == "mds cluster_down") { if (pending_mdsmap.test_flag(CEPH_MDSMAP_DOWN)) { ss << "mdsmap already marked DOWN"; - r = -EPERM; } else { pending_mdsmap.set_flag(CEPH_MDSMAP_DOWN); ss << "marked mdsmap DOWN"; - r = 0; } - + r = 0; } else if (prefix == "mds cluster_up") { if (pending_mdsmap.test_flag(CEPH_MDSMAP_DOWN)) { pending_mdsmap.clear_flag(CEPH_MDSMAP_DOWN); ss << "unmarked mdsmap DOWN"; - r = 0; } else { ss << "mdsmap not marked DOWN"; - r = -EPERM; } - + r = 0; } else if (prefix == "mds compat rm_compat") { int64_t f; cmd_getval(g_ceph_context, cmdmap, "feature", f); @@ -935,6 +933,8 @@ bool MDSMonitor::prepare_command(MMonCommand *m) int64_t poolid; cmd_getval(g_ceph_context, cmdmap, "poolid", poolid); r = pending_mdsmap.remove_data_pool(poolid); + if (r == -ENOENT) + r = 0; if (r == 0) ss << "removed data pool " << poolid << " from mdsmap"; diff --git a/src/mon/MonClient.cc b/src/mon/MonClient.cc index 57f30063fa7c2..8139b0259bf12 100644 --- a/src/mon/MonClient.cc +++ b/src/mon/MonClient.cc @@ -516,6 +516,7 @@ void MonClient::_reopen_session(int rank, string name) // throw out version check requests while (!version_requests.empty()) { finisher.queue(version_requests.begin()->second->context, -EAGAIN); + delete version_requests.begin()->second; version_requests.erase(version_requests.begin()); } diff --git a/src/mon/MonCommands.h b/src/mon/MonCommands.h index 9ec74673a9ea6..5b950ca1aefe5 100644 --- a/src/mon/MonCommands.h +++ b/src/mon/MonCommands.h @@ -122,7 +122,7 @@ COMMAND("pg dump_pools_json", "show pg pools info in json only",\ COMMAND("pg dump_stuck " \ "name=stuckops,type=CephChoices,strings=inactive|unclean|stale,n=N,req=false " \ "name=threshold,type=CephInt,req=false", - "show information about stuck pgs [--threshold=seconds to consider stuck]",\ + "show information about stuck pgs",\ "pg", "r", "cli,rest") COMMAND("pg map name=pgid,type=CephPgid", "show mapping of pg to osds", \ "pg", "r", "cli,rest") @@ -209,7 +209,6 @@ COMMAND("report name=tags,type=CephString,n=N,req=false", \ COMMAND("quorum_status", "report status of monitor quorum", \ "mon", "r", "cli,rest") COMMAND("mon_status", "report status of monitors", "mon", "r", "cli,rest") -COMMAND("sync status", "report status of monitors", "mon", "r", "cli,rest") COMMAND("sync force " \ "name=validate1,type=CephChoices,strings=--yes-i-really-mean-it " \ "name=validate2,type=CephChoices,strings=--i-know-what-i-am-doing", \ @@ -357,55 +356,55 @@ COMMAND("osd setcrushmap", "set crush map from input file", \ COMMAND("osd crush set", "set crush map from input file", \ "osd", "rw", "cli,rest") COMMAND("osd crush add-bucket " \ - "name=name,type=CephString " \ + "name=name,type=CephString,goodchars=[A-Za-z0-9-_.] " \ "name=type,type=CephString", \ "add no-parent (probably root) crush bucket of type ", \ "osd", "rw", "cli,rest") COMMAND("osd crush set " \ "name=id,type=CephOsdName " \ "name=weight,type=CephFloat,range=0.0 " \ - "name=args,type=CephString,n=N", \ + "name=args,type=CephString,n=N,goodchars=[A-Za-z0-9-_.=]", \ "set crushmap entry for to with location ", \ "osd", "rw", "cli,rest") COMMAND("osd crush add " \ "name=id,type=CephOsdName " \ "name=weight,type=CephFloat,range=0.0 " \ - "name=args,type=CephString,n=N", \ + "name=args,type=CephString,n=N,goodchars=[A-Za-z0-9-_.=]", \ "add crushmap entry for with and location ", \ "osd", "rw", "cli,rest") COMMAND("osd crush create-or-move " \ "name=id,type=CephOsdName " \ "name=weight,type=CephFloat,range=0.0 " \ - "name=args,type=CephString,n=N", \ + "name=args,type=CephString,n=N,goodchars=[A-Za-z0-9-_.=]", \ "create entry or move existing entry for at/to location ", \ "osd", "rw", "cli,rest") COMMAND("osd crush move " \ "name=id,type=CephOsdName " \ - "name=args,type=CephString,n=N", \ + "name=args,type=CephString,n=N,goodchars=[A-Za-z0-9-_.=]", \ "move existing entry for to location ", \ "osd", "rw", "cli,rest") COMMAND("osd crush link " \ "name=name,type=CephString " \ - "name=args,type=CephString,n=N", \ + "name=args,type=CephString,n=N,goodchars=[A-Za-z0-9-_.=]", \ "link existing entry for under location ", \ "osd", "rw", "cli,rest") COMMAND("osd crush rm " \ - "name=name,type=CephString " \ - "name=ancestor,type=CephString,req=false", \ + "name=name,type=CephString,goodchars=[A-Za-z0-9-_.] " \ + "name=ancestor,type=CephString,req=false,goodchars=[A-Za-z0-9-_.]", \ "remove from crush map (everywhere, or just at ",\ "osd", "rw", "cli,rest") COMMAND("osd crush remove " \ - "name=name,type=CephString " \ - "name=ancestor,type=CephString,req=false", \ + "name=name,type=CephString,goodchars=[A-Za-z0-9-_.] " \ + "name=ancestor,type=CephString,req=false,goodchars=[A-Za-z0-9-_.]", \ "remove from crush map (everywhere, or just at ", \ "osd", "rw", "cli,rest") COMMAND("osd crush unlink " \ - "name=name,type=CephString " \ - "name=ancestor,type=CephString,req=false", \ + "name=name,type=CephString,goodchars=[A-Za-z0-9-_.] " \ + "name=ancestor,type=CephString,req=false,goodchars=[A-Za-z0-9-_.]", \ "unlink from crush map (everywhere, or just at ", \ "osd", "rw", "cli,rest") COMMAND("osd crush reweight " \ - "name=name,type=CephString " \ + "name=name,type=CephString,goodchars=[A-Za-z0-9-_.] " \ "name=weight,type=CephFloat,range=0.0", \ "change 's weight to in crush map", \ "osd", "rw", "cli,rest") @@ -413,12 +412,13 @@ COMMAND("osd crush tunables " \ "name=profile,type=CephChoices,strings=legacy|argonaut|bobtail|optimal|default", \ "set crush tunables values to ", "osd", "rw", "cli,rest") COMMAND("osd crush rule create-simple " \ - "name=name,type=CephString " \ - "name=root,type=CephString " \ - "name=type,type=CephString", \ + "name=name,type=CephString,goodchars=[A-Za-z0-9-_.] " \ + "name=root,type=CephString,goodchars=[A-Za-z0-9-_.] " \ + "name=type,type=CephString,goodchars=[A-Za-z0-9-_.]", "create crush rule in of type ", \ "osd", "rw", "cli,rest") COMMAND("osd crush rule rm " \ + "name=name,type=CephString,goodchars=[A-Za-z0-9-_.] " \ "name=name,type=CephString", \ "remove crush rule ", "osd", "rw", "cli,rest") COMMAND("osd setmaxosd " \ diff --git a/src/mon/Monitor.cc b/src/mon/Monitor.cc index 914714d733ca9..90750dd7b1167 100644 --- a/src/mon/Monitor.cc +++ b/src/mon/Monitor.cc @@ -644,7 +644,7 @@ void Monitor::bootstrap() // reset state = STATE_PROBING; - reset(); + _reset(); // sync store if (g_conf->mon_compact_on_bootstrap) { @@ -708,9 +708,12 @@ void Monitor::_add_bootstrap_peer_hint(string cmd, string args, ostream& ss) } // called by bootstrap(), or on leader|peon -> electing -void Monitor::reset() +void Monitor::_reset() { - dout(10) << "reset" << dendl; + dout(10) << __func__ << dendl; + + assert(state == STATE_ELECTING || + state == STATE_PROBING); cancel_probe_timeout(); timecheck_finish(); @@ -1033,7 +1036,7 @@ void Monitor::handle_sync_get_chunk(MMonSync *m) sp.reset_timeout(g_ceph_context, g_conf->mon_sync_timeout * 2); if (sp.last_committed < paxos->get_first_committed() && - paxos->get_first_committed() >= 1) { + paxos->get_first_committed() > 1) { dout(10) << __func__ << " sync requester fell behind paxos, their lc " << sp.last_committed << " < our fc " << paxos->get_first_committed() << dendl; sync_providers.erase(m->cookie); @@ -1407,14 +1410,21 @@ void Monitor::handle_probe_reply(MMonProbe *m) m->put(); } +void Monitor::join_election() +{ + dout(10) << __func__ << dendl; + state = STATE_ELECTING; + _reset(); +} + void Monitor::start_election() { dout(10) << "start_election" << dendl; + state = STATE_ELECTING; + _reset(); cancel_probe_timeout(); - // call a new election - state = STATE_ELECTING; clog.info() << "mon." << name << " calling new monitor election\n"; elector.call_election(); } @@ -1447,18 +1457,15 @@ epoch_t Monitor::get_epoch() void Monitor::win_election(epoch_t epoch, set& active, uint64_t features) { - if (!is_electing()) - reset(); - + dout(10) << __func__ << " epoch " << epoch << " quorum " << active + << " features " << features << dendl; + assert(is_electing()); state = STATE_LEADER; leader_since = ceph_clock_now(g_ceph_context); leader = rank; quorum = active; quorum_features = features; outside_quorum.clear(); - dout(10) << "win_election, epoch " << epoch << " quorum is " << quorum - << " features are " << quorum_features - << dendl; clog.info() << "mon." << name << "@" << rank << " won leader election with quorum " << quorum << "\n"; @@ -2123,18 +2130,16 @@ void Monitor::handle_command(MMonCommand *m) osdmon()->dump_info(f.get()); mdsmon()->dump_info(f.get()); pgmon()->dump_info(f.get()); + authmon()->dump_info(f.get()); + + paxos->dump_info(f.get()); f->close_section(); - f->flush(ds); + f->flush(rdata); - bufferlist bl; - bl.append("-------- BEGIN REPORT --------\n"); - bl.append(ds); ostringstream ss2; - ss2 << "\n-------- END REPORT " << bl.crc32c(6789) << " --------\n"; - rdata.append(bl); - rdata.append(ss2.str()); - rs = string(); + ss2 << "report " << rdata.crc32c(6789); + rs = ss2.str(); r = 0; } else if (prefix == "quorum_status") { if (!access_r) { @@ -2205,14 +2210,12 @@ void Monitor::handle_command(MMonCommand *m) string quorumcmd; cmd_getval(g_ceph_context, cmdmap, "quorumcmd", quorumcmd); if (quorumcmd == "exit") { - reset(); start_election(); elector.stop_participating(); rs = "stopped responding to quorum, initiated new election"; r = 0; } else if (quorumcmd == "enter") { elector.start_participating(); - reset(); start_election(); rs = "started responding to quorum, initiated new election"; r = 0; @@ -2315,6 +2318,9 @@ void Monitor::handle_forward(MForward *m) m->msg = NULL; // so ~MForward doesn't delete it req->set_connection(c); + // not super accurate, but better than nothing. + req->set_recv_stamp(m->get_recv_stamp()); + /* * note which election epoch this is; we will drop the message if * there is a future election since our peers will resend routed @@ -3236,9 +3242,6 @@ bool Monitor::ms_handle_reset(Connection *con) { dout(10) << "ms_handle_reset " << con << " " << con->get_peer_addr() << dendl; - if (is_shutdown()) - return false; - // ignore lossless monitor sessions if (con->get_peer_type() == CEPH_ENTITY_TYPE_MON) return false; @@ -3247,6 +3250,12 @@ bool Monitor::ms_handle_reset(Connection *con) if (!s) return false; + // break any con <-> session ref cycle + s->con->set_priv(NULL); + + if (is_shutdown()) + return false; + Mutex::Locker l(lock); dout(10) << "reset/close on session " << s->inst << dendl; @@ -3468,16 +3477,16 @@ void Monitor::tick() continue; if (!s->until.is_zero() && s->until < now) { - dout(10) << " trimming session " << s->inst + dout(10) << " trimming session " << s->con << " " << s->inst << " (until " << s->until << " < now " << now << ")" << dendl; - messenger->mark_down(s->inst.addr); + messenger->mark_down(s->con); remove_session(s); } else if (!exited_quorum.is_zero()) { if (now > (exited_quorum + 2 * g_conf->mon_lease)) { // boot the client Session because we've taken too long getting back in - dout(10) << " trimming session " << s->inst - << " because we've been out of quorum too long" << dendl; - messenger->mark_down(s->inst.addr); + dout(10) << " trimming session " << s->con << " " << s->inst + << " because we've been out of quorum too long" << dendl; + messenger->mark_down(s->con); remove_session(s); } } @@ -3807,7 +3816,7 @@ int Monitor::StoreConverter::needs_conversion() bufferlist magicbl; int ret = 0; - dout(10) << __func__ << dendl; + dout(10) << "check if store needs conversion from legacy format" << dendl; _init(); int err = store->mount(); @@ -3845,7 +3854,6 @@ int Monitor::StoreConverter::needs_conversion() int Monitor::StoreConverter::convert() { _init(); - assert(!db->create_and_open(std::cerr)); assert(!store->mount()); if (db->exists("mon_convert", "on_going")) { dout(0) << __func__ << " found a mon store in mid-convertion; abort!" diff --git a/src/mon/Monitor.h b/src/mon/Monitor.h index 35bff4207add8..82b08816702b0 100644 --- a/src/mon/Monitor.h +++ b/src/mon/Monitor.h @@ -509,8 +509,11 @@ class Monitor : public Dispatcher { return quorum_features; } +private: + void _reset(); ///< called from bootstrap, start_, or join_election +public: void bootstrap(); - void reset(); + void join_election(); void start_election(); void win_standalone_election(); void win_election(epoch_t epoch, set& q, @@ -785,6 +788,7 @@ class Monitor : public Dispatcher { bool _check_gv_store(); void _init() { + assert(!store); MonitorStore *store_ptr = new MonitorStore(path); store.reset(store_ptr); } diff --git a/src/mon/MonmapMonitor.cc b/src/mon/MonmapMonitor.cc index d04418b6391ab..195f66350d88e 100644 --- a/src/mon/MonmapMonitor.cc +++ b/src/mon/MonmapMonitor.cc @@ -137,13 +137,14 @@ bool MonmapMonitor::preprocess_query(PaxosServiceMessage *m) void MonmapMonitor::dump_info(Formatter *f) { + f->dump_unsigned("monmap_first_committed", get_first_committed()); f->open_object_section("monmap"); mon->monmap->dump(f); + f->close_section(); f->open_array_section("quorum"); for (set::iterator q = mon->get_quorum().begin(); q != mon->get_quorum().end(); ++q) f->dump_int("mon", *q); f->close_section(); - f->close_section(); } bool MonmapMonitor::preprocess_command(MMonCommand *m) diff --git a/src/mon/OSDMonitor.cc b/src/mon/OSDMonitor.cc index 7e76ea271c40d..c8baac58c83bf 100644 --- a/src/mon/OSDMonitor.cc +++ b/src/mon/OSDMonitor.cc @@ -185,12 +185,8 @@ void OSDMonitor::update_from_paxos(bool *need_bootstrap) mon->pgmon()->check_osd_map(osdmap.epoch); } - send_to_waiting(); - check_subs(); - - share_map_with_random_osd(); update_logger(); - + share_map_with_random_osd(); process_failures(); // make sure our feature bits reflect the latest map @@ -298,8 +294,17 @@ void OSDMonitor::on_active() { update_logger(); - if (thrash_map && thrash()) - propose_pending(); + send_to_waiting(); + check_subs(); + + if (thrash_map) { + if (mon->is_leader()) { + if (thrash()) + propose_pending(); + } else { + thrash_map = 0; + } + } if (mon->is_leader()) mon->clog.info() << "osdmap " << osdmap << "\n"; @@ -309,6 +314,21 @@ void OSDMonitor::on_active() } } +void OSDMonitor::on_shutdown() +{ + dout(10) << __func__ << dendl; + map >::iterator p = waiting_for_map.begin(); + while (p != waiting_for_map.end()) { + while (!p->second.empty()) { + Message *m = p->second.front(); + dout(20) << " discarding " << m << " " << *m << dendl; + m->put(); + p->second.pop_front(); + } + waiting_for_map.erase(p++); + } +} + void OSDMonitor::update_logger() { dout(10) << "update_logger" << dendl; @@ -1291,6 +1311,7 @@ void OSDMonitor::_reply_map(PaxosServiceMessage *m, epoch_t e) { dout(7) << "_reply_map " << e << " from " << m->get_orig_source_inst() + << " for " << m << dendl; send_latest(m, e); } @@ -1912,6 +1933,8 @@ void OSDMonitor::dump_info(Formatter *f) osdmap.dump(f); f->close_section(); + f->dump_unsigned("osdmap_first_committed", get_first_committed()); + f->open_object_section("crushmap"); osdmap.crush->dump(f); f->close_section(); @@ -2647,7 +2670,11 @@ bool OSDMonitor::prepare_command(MMonCommand *m) int bucketno = newcrush.add_bucket(0, CRUSH_BUCKET_STRAW, CRUSH_HASH_DEFAULT, type, 0, NULL, NULL); - newcrush.set_item_name(bucketno, name); + err = newcrush.set_item_name(bucketno, name); + if (err < 0) { + ss << "error setting bucket name to '" << name << "'"; + goto reply; + } pending_inc.crush.clear(); newcrush.encode(pending_inc.crush); @@ -3289,30 +3316,33 @@ bool OSDMonitor::prepare_command(MMonCommand *m) if (pool < 0) { ss << "unrecognized pool '" << poolstr << "'"; err = -ENOENT; + goto reply; + } + string snapname; + cmd_getval(g_ceph_context, cmdmap, "snap", snapname); + const pg_pool_t *p = osdmap.get_pg_pool(pool); + if (p->snap_exists(snapname.c_str())) { + ss << "pool " << poolstr << " snap " << snapname << " already exists"; + err = 0; + goto reply; + } + pg_pool_t *pp = 0; + if (pending_inc.new_pools.count(pool)) + pp = &pending_inc.new_pools[pool]; + if (!pp) { + pp = &pending_inc.new_pools[pool]; + *pp = *p; + } + if (pp->snap_exists(snapname.c_str())) { + ss << "pool " << poolstr << " snap " << snapname << " already exists"; } else { - const pg_pool_t *p = osdmap.get_pg_pool(pool); - pg_pool_t *pp = 0; - if (pending_inc.new_pools.count(pool)) - pp = &pending_inc.new_pools[pool]; - string snapname; - cmd_getval(g_ceph_context, cmdmap, "snap", snapname); - if (p->snap_exists(snapname.c_str()) || - (pp && pp->snap_exists(snapname.c_str()))) { - ss << "pool " << poolstr << " snap " << snapname << " already exists"; - err = 0; - } else { - if (!pp) { - pp = &pending_inc.new_pools[pool]; - *pp = *p; - } - pp->add_snap(snapname.c_str(), ceph_clock_now(g_ceph_context)); - pp->set_snap_epoch(pending_inc.epoch); - ss << "created pool " << poolstr << " snap " << snapname; - getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, get_last_committed())); - return true; - } + pp->add_snap(snapname.c_str(), ceph_clock_now(g_ceph_context)); + pp->set_snap_epoch(pending_inc.epoch); + ss << "created pool " << poolstr << " snap " << snapname; } + getline(ss, rs); + wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, get_last_committed())); + return true; } else if (prefix == "osd pool rmsnap") { string poolstr; cmd_getval(g_ceph_context, cmdmap, "pool", poolstr); @@ -3320,32 +3350,34 @@ bool OSDMonitor::prepare_command(MMonCommand *m) if (pool < 0) { ss << "unrecognized pool '" << poolstr << "'"; err = -ENOENT; + goto reply; + } + string snapname; + cmd_getval(g_ceph_context, cmdmap, "snap", snapname); + const pg_pool_t *p = osdmap.get_pg_pool(pool); + if (!p->snap_exists(snapname.c_str())) { + ss << "pool " << poolstr << " snap " << snapname << " does not exist"; + err = 0; + goto reply; + } + pg_pool_t *pp = 0; + if (pending_inc.new_pools.count(pool)) + pp = &pending_inc.new_pools[pool]; + if (!pp) { + pp = &pending_inc.new_pools[pool]; + *pp = *p; + } + snapid_t sn = pp->snap_exists(snapname.c_str()); + if (sn) { + pp->remove_snap(sn); + pp->set_snap_epoch(pending_inc.epoch); + ss << "removed pool " << poolstr << " snap " << snapname; } else { - const pg_pool_t *p = osdmap.get_pg_pool(pool); - pg_pool_t *pp = 0; - if (pending_inc.new_pools.count(pool)) - pp = &pending_inc.new_pools[pool]; - string snapname; - cmd_getval(g_ceph_context, cmdmap, "snap", snapname); - if (!p->snap_exists(snapname.c_str()) && - (!pp || !pp->snap_exists(snapname.c_str()))) { - ss << "pool " << poolstr << " snap " << snapname << " does not exist"; - err = 0; - } else { - if (!pp) { - pp = &pending_inc.new_pools[pool]; - *pp = *p; - } - snapid_t sn = pp->snap_exists(snapname.c_str()); - pp->remove_snap(sn); - pp->set_snap_epoch(pending_inc.epoch); - ss << "removed pool " << poolstr << " snap " << snapname; - getline(ss, rs); - wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, get_last_committed())); - return true; - } + ss << "already removed pool " << poolstr << " snap " << snapname; } - + getline(ss, rs); + wait_for_finished_proposal(new Monitor::C_Command(mon, m, 0, rs, get_last_committed())); + return true; } else if (prefix == "osd pool create") { int64_t pg_num; int64_t pgp_num; diff --git a/src/mon/OSDMonitor.h b/src/mon/OSDMonitor.h index 3d901a8f13963..d65532283210c 100644 --- a/src/mon/OSDMonitor.h +++ b/src/mon/OSDMonitor.h @@ -154,6 +154,7 @@ class OSDMonitor : public PaxosService { void encode_pending(MonitorDBStore::Transaction *t); virtual void encode_full(MonitorDBStore::Transaction *t); void on_active(); + void on_shutdown(); /** * do not let paxosservice periodically stash full osdmaps, or we will break our diff --git a/src/mon/PGMonitor.cc b/src/mon/PGMonitor.cc index 365d836cebd0e..1f11d5486cf2f 100644 --- a/src/mon/PGMonitor.cc +++ b/src/mon/PGMonitor.cc @@ -1295,6 +1295,8 @@ void PGMonitor::dump_info(Formatter *f) f->open_object_section("pgmap"); pg_map.dump(f); f->close_section(); + + f->dump_unsigned("pgmap_first_committed", get_first_committed()); } bool PGMonitor::preprocess_command(MMonCommand *m) @@ -1435,8 +1437,8 @@ bool PGMonitor::preprocess_command(MMonCommand *m) if (f) { f->open_object_section("pg_map"); f->dump_stream("epoch") << mon->osdmon()->osdmap.get_epoch(); - f->dump_stream("pgid") << pgid; - f->dump_stream("mpgid") << mpgid; + f->dump_stream("raw_pgid") << pgid; + f->dump_stream("pgid") << mpgid; f->open_array_section("up"); for (vector::iterator it = up.begin(); it != up.end(); ++it) @@ -1659,6 +1661,26 @@ static void note_stuck_detail(enum PGMap::StuckPG what, } } +int PGMonitor::_warn_slow_request_histogram(const pow2_hist_t& h, string suffix, + list >& summary, + list > *detail) const +{ + unsigned sum = 0; + for (unsigned i = h.h.size() - 1; i > 0; --i) { + float ub = (float)(1 << i) / 1000.0; + if (ub < g_conf->mon_osd_max_op_age) + break; + ostringstream ss; + if (h.h[i]) { + ss << h.h[i] << " ops are blocked > " << ub << " sec" << suffix; + if (detail) + detail->push_back(make_pair(HEALTH_WARN, ss.str())); + sum += h.h[i]; + } + } + return sum; +} + void PGMonitor::get_health(list >& summary, list > *detail) const { @@ -1763,6 +1785,35 @@ void PGMonitor::get_health(list >& summary, } } + // slow requests + if (g_conf->mon_osd_max_op_age > 0 && + pg_map.osd_sum.op_queue_age_hist.upper_bound() > g_conf->mon_osd_max_op_age) { + unsigned sum = _warn_slow_request_histogram(pg_map.osd_sum.op_queue_age_hist, "", summary, detail); + if (sum > 0) { + ostringstream ss; + ss << sum << " requests are blocked > " << g_conf->mon_osd_max_op_age << " sec"; + summary.push_back(make_pair(HEALTH_WARN, ss.str())); + + unsigned num_slow_osds = 0; + if (detail) { + // do per-osd warnings + for (hash_map::const_iterator p = pg_map.osd_stat.begin(); + p != pg_map.osd_stat.end(); + ++p) { + if (_warn_slow_request_histogram(p->second.op_queue_age_hist, + string(" on osd.") + stringify(p->first), + summary, detail)) + ++num_slow_osds; + } + ostringstream ss2; + ss2 << num_slow_osds << " osds have slow requests"; + summary.push_back(make_pair(HEALTH_WARN, ss2.str())); + detail->push_back(make_pair(HEALTH_WARN, ss2.str())); + } + } + } + + // recovery stringstream rss; pg_map.recovery_summary(NULL, &rss); if (!rss.str().empty()) { @@ -1771,9 +1822,11 @@ void PGMonitor::get_health(list >& summary, detail->push_back(make_pair(HEALTH_WARN, "recovery " + rss.str())); } + // full/nearfull check_full_osd_health(summary, detail, pg_map.full_osds, "full", HEALTH_ERR); check_full_osd_health(summary, detail, pg_map.nearfull_osds, "near full", HEALTH_WARN); + // scrub if (pg_map.pg_sum.stats.sum.num_scrub_errors) { ostringstream ss; ss << pg_map.pg_sum.stats.sum.num_scrub_errors << " scrub errors"; diff --git a/src/mon/PGMonitor.h b/src/mon/PGMonitor.h index 271d0e1161d45..e8e1b4210aaaa 100644 --- a/src/mon/PGMonitor.h +++ b/src/mon/PGMonitor.h @@ -188,6 +188,10 @@ class PGMonitor : public PaxosService { void dump_info(Formatter *f); + int _warn_slow_request_histogram(const pow2_hist_t& h, string suffix, + list >& summary, + list > *detail) const; + void get_health(list >& summary, list > *detail) const; void check_full_osd_health(list >& summary, diff --git a/src/mon/Paxos.cc b/src/mon/Paxos.cc index d988c6415476a..ee2ba3b6fdb89 100644 --- a/src/mon/Paxos.cc +++ b/src/mon/Paxos.cc @@ -73,6 +73,16 @@ void Paxos::init() assert(is_consistent()); } +void Paxos::dump_info(Formatter *f) +{ + f->open_object_section("paxos"); + f->dump_unsigned("first_committed", first_committed); + f->dump_unsigned("last_committed", last_committed); + f->dump_unsigned("last_pn", last_pn); + f->dump_unsigned("accepted_pn", accepted_pn); + f->close_section(); +} + // --------------------------------- // PHASE 1 @@ -427,11 +437,13 @@ void Paxos::handle_last(MMonPaxos *last) dout(10) << "that's everyone. active!" << dendl; extend_lease(); - finish_proposal(); + if (do_refresh()) { + finish_round(); - finish_contexts(g_ceph_context, waiting_for_active); - finish_contexts(g_ceph_context, waiting_for_readable); - finish_contexts(g_ceph_context, waiting_for_writeable); + finish_contexts(g_ceph_context, waiting_for_active); + finish_contexts(g_ceph_context, waiting_for_readable); + finish_contexts(g_ceph_context, waiting_for_writeable); + } } } } else { @@ -507,11 +519,15 @@ void Paxos::begin(bufferlist& v) if (mon->get_quorum().size() == 1) { // we're alone, take it easy commit(); - finish_proposal(); - finish_contexts(g_ceph_context, waiting_for_active); - finish_contexts(g_ceph_context, waiting_for_commit); - finish_contexts(g_ceph_context, waiting_for_readable); - finish_contexts(g_ceph_context, waiting_for_writeable); + if (do_refresh()) { + assert(is_updating()); // we can't be updating-previous with quorum of 1 + commit_proposal(); + finish_round(); + finish_contexts(g_ceph_context, waiting_for_active); + finish_contexts(g_ceph_context, waiting_for_commit); + finish_contexts(g_ceph_context, waiting_for_readable); + finish_contexts(g_ceph_context, waiting_for_writeable); + } return; } @@ -589,14 +605,12 @@ void Paxos::handle_accept(MMonPaxos *accept) if (accept->pn != accepted_pn) { // we accepted a higher pn, from some other leader dout(10) << " we accepted a higher pn " << accepted_pn << ", ignoring" << dendl; - accept->put(); - return; + goto out; } if (last_committed > 0 && accept->last_committed < last_committed-1) { dout(10) << " this is from an old round, ignoring" << dendl; - accept->put(); - return; + goto out; } assert(accept->last_committed == last_committed || // not committed accept->last_committed == last_committed-1); // committed @@ -612,6 +626,11 @@ void Paxos::handle_accept(MMonPaxos *accept) // note: this may happen before the lease is reextended (below) dout(10) << " got majority, committing" << dendl; commit(); + if (!do_refresh()) + goto out; + if (is_updating()) + commit_proposal(); + finish_contexts(g_ceph_context, waiting_for_commit); } // done? @@ -624,14 +643,15 @@ void Paxos::handle_accept(MMonPaxos *accept) // yay! extend_lease(); - finish_proposal(); + finish_round(); // wake people up finish_contexts(g_ceph_context, waiting_for_active); - finish_contexts(g_ceph_context, waiting_for_commit); finish_contexts(g_ceph_context, waiting_for_readable); finish_contexts(g_ceph_context, waiting_for_writeable); } + + out: accept->put(); } @@ -787,44 +807,48 @@ void Paxos::warn_on_future_time(utime_t t, entity_name_t from) } -void Paxos::finish_proposal() +bool Paxos::do_refresh() { - assert(mon->is_leader()); + bool need_bootstrap = false; // make sure we have the latest state loaded up - bool need_bootstrap = false; mon->refresh_from_paxos(&need_bootstrap); - // ok, now go active! - state = STATE_ACTIVE; + if (need_bootstrap) { + dout(10) << " doing requested bootstrap" << dendl; + mon->bootstrap(); + return false; + } - // finish off the last proposal - if (!proposals.empty()) { - assert(mon->is_leader()); + return true; +} - C_Proposal *proposal = static_cast(proposals.front()); - if (!proposal->proposed) { - dout(10) << __func__ << " proposal " << proposal << ": we must have received a stay message and we're " - << "trying to finish before time. " - << "Instead, propose it (if we are active)!" << dendl; - } else { - dout(10) << __func__ << " proposal " << proposal << " took " - << (ceph_clock_now(NULL) - proposal->proposal_time) - << " to finish" << dendl; - proposals.pop_front(); - proposal->complete(0); - } - } +void Paxos::commit_proposal() +{ + dout(10) << __func__ << dendl; + assert(mon->is_leader()); + assert(!proposals.empty()); + assert(is_updating()); + + C_Proposal *proposal = static_cast(proposals.front()); + assert(proposal->proposed); + dout(10) << __func__ << " proposal " << proposal << " took " + << (ceph_clock_now(NULL) - proposal->proposal_time) + << " to finish" << dendl; + proposals.pop_front(); + proposal->complete(0); +} + +void Paxos::finish_round() +{ + assert(mon->is_leader()); + + // ok, now go active! + state = STATE_ACTIVE; dout(10) << __func__ << " state " << state << " proposals left " << proposals.size() << dendl; - if (need_bootstrap) { - dout(10) << " doing requested bootstrap" << dendl; - mon->bootstrap(); - return; - } - if (should_trim()) { trim(); } diff --git a/src/mon/Paxos.h b/src/mon/Paxos.h index 1cdad50e5bbed..cab27f289a882 100644 --- a/src/mon/Paxos.h +++ b/src/mon/Paxos.h @@ -983,7 +983,19 @@ class Paxos { * Begin proposing the Proposal at the front of the proposals queue. */ void propose_queued(); - void finish_proposal(); + + /** + * refresh state from store + * + * Called when we have new state for the mon to consume. If we return false, + * abort (we triggered a bootstrap). + * + * @returns true on success, false if we are now bootstrapping + */ + bool do_refresh(); + + void commit_proposal(); + void finish_round(); public: /** @@ -1018,6 +1030,12 @@ class Paxos { void read_and_prepare_transactions(MonitorDBStore::Transaction *tx, version_t from, version_t last); void init(); + + /** + * dump state info to a formatter + */ + void dump_info(Formatter *f); + /** * This function runs basic consistency checks. Importantly, if * it is inconsistent and shouldn't be, it asserts out. diff --git a/src/mon/PaxosService.cc b/src/mon/PaxosService.cc index efe60aafd691a..d6e67a1c4b4c0 100644 --- a/src/mon/PaxosService.cc +++ b/src/mon/PaxosService.cc @@ -163,10 +163,9 @@ void PaxosService::propose_pending() { dout(10) << "propose_pending" << dendl; assert(have_pending); + assert(!proposing); assert(mon->is_leader()); assert(is_active()); - if (!is_active()) - return; if (proposal_timer) { dout(10) << " canceling proposal_timer " << proposal_timer << dendl; @@ -232,6 +231,12 @@ void PaxosService::restart() finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN); + if (have_pending) { + discard_pending(); + have_pending = false; + } + proposing = false; + on_restart(); } @@ -239,18 +244,6 @@ void PaxosService::election_finished() { dout(10) << "election_finished" << dendl; - if (proposal_timer) { - dout(10) << " canceling proposal_timer " << proposal_timer << dendl; - mon->timer.cancel_event(proposal_timer); - proposal_timer = 0; - } - - if (have_pending) { - discard_pending(); - have_pending = false; - } - proposing = false; - finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN); // make sure we update our state @@ -320,6 +313,8 @@ void PaxosService::shutdown() } finish_contexts(g_ceph_context, waiting_for_finished_proposal, -EAGAIN); + + on_shutdown(); } void PaxosService::maybe_trim() diff --git a/src/mon/PaxosService.h b/src/mon/PaxosService.h index 6c8d9c0bcc48c..a5761d19ad837 100644 --- a/src/mon/PaxosService.h +++ b/src/mon/PaxosService.h @@ -445,6 +445,11 @@ class PaxosService { */ virtual void on_active() { } + /** + * This is called when we are shutting down + */ + virtual void on_shutdown() {} + /** * this is called when activating on the leader * diff --git a/src/msg/Accepter.cc b/src/msg/Accepter.cc index 4d13be8fdca35..d6e94d1cc515c 100644 --- a/src/msg/Accepter.cc +++ b/src/msg/Accepter.cc @@ -165,6 +165,11 @@ int Accepter::rebind(const set& avoid_ports) new_avoid.insert(addr.get_port()); addr.set_port(0); + // adjust the nonce; we want our entity_addr_t to be truly unique. + nonce += 1000000; + msgr->my_inst.addr.nonce = nonce; + ldout(msgr->cct,10) << " new nonce " << nonce << " and inst " << msgr->my_inst << dendl; + ldout(msgr->cct,10) << " will try " << addr << " and avoid ports " << new_avoid << dendl; int r = bind(addr, new_avoid); if (r == 0) diff --git a/src/msg/Message.h b/src/msg/Message.h index ad2bc8249b744..3ed8ee667d242 100644 --- a/src/msg/Message.h +++ b/src/msg/Message.h @@ -172,7 +172,9 @@ struct Connection : private RefCountedObject { RefCountedObject *priv; int peer_type; entity_addr_t peer_addr; +private: uint64_t features; +public: RefCountedObject *pipe; bool failed; /// true if we are a lossy connection that has failed. diff --git a/src/msg/Messenger.h b/src/msg/Messenger.h index 28643e1076748..42feaf227dfd8 100644 --- a/src/msg/Messenger.h +++ b/src/msg/Messenger.h @@ -329,17 +329,13 @@ class Messenger { */ virtual int bind(const entity_addr_t& bind_addr) = 0; /** - * This is an optional function for implementations - * to override. For those implementations that do - * implement it, this function shall perform a full - * restart of the Messenger component, whatever that means. - * Other entities who connect to this Messenger post-rebind() - * should perceive it as a new entity which they have not - * previously contacted, and it MUST bind to a different - * address than it did previously. If avoid_port is non-zero - * it must additionally avoid that port. - * - * @param avoid_port An additional port to avoid binding to. + * This function performs a full restart of the Messenger component, + * whatever that means. Other entities who connect to this + * Messenger post-rebind() should perceive it as a new entity which + * they have not previously contacted, and it MUST bind to a + * different address than it did previously. + * + * @param avoid_ports Additional port to avoid binding to. */ virtual int rebind(const set& avoid_ports) { return -EOPNOTSUPP; } /** @@ -392,6 +388,9 @@ class Messenger { * when you pass it in. * @param dest The entity to send the Message to. * + * DEPRECATED: please do not use this interface for any new code; + * use the Connection* variant. + * * @return 0 on success, or -errno on failure. */ virtual int send_message(Message *m, const entity_inst_t& dest) = 0; @@ -421,6 +420,9 @@ class Messenger { * when you pass it in. * @param dest The entity to send the Message to. * + * DEPRECATED: please do not use this interface for any new code; + * use the Connection* variant. + * * @return 0. */ virtual int lazy_send_message(Message *m, const entity_inst_t& dest) = 0; @@ -476,22 +478,33 @@ class Messenger { */ virtual int send_keepalive(Connection *con) = 0; /** - * Mark down a Connection to a remote. This will cause us to - * discard our outgoing queue for them, and if they try - * to reconnect they will discard their queue when we - * inform them of the session reset. If there is no - * Connection to the given dest, it is a no-op. - * It does not generate any notifications to the Dispatcher. + * Mark down a Connection to a remote. + * + * This will cause us to discard our outgoing queue for them, and if + * reset detection is enabled in the policy and the endpoint tries + * to reconnect they will discard their queue when we inform them of + * the session reset. + * + * If there is no Connection to the given dest, it is a no-op. + * + * This generates a RESET notification to the Dispatcher. + * + * DEPRECATED: please do not use this interface for any new code; + * use the Connection* variant. * * @param a The address to mark down. */ virtual void mark_down(const entity_addr_t& a) = 0; /** - * Mark down the given Connection. This will cause us to - * discard its outgoing queue, and if the endpoint tries - * to reconnect they will discard their queue when we - * inform them of the session reset. + * Mark down the given Connection. + * + * This will cause us to discard its outgoing queue, and if reset + * detection is enabled in the policy and the endpoint tries to + * reconnect they will discard their queue when we inform them of + * the session reset. + * * If the Connection* is NULL, this is a no-op. + * * It does not generate any notifications to the Dispatcher. * * @param con The Connection to mark down. @@ -500,6 +513,14 @@ class Messenger { void mark_down(const ConnectionRef& con) { mark_down(con.get()); } + /** + * Mark all the existing Connections down. This is equivalent + * to iterating over all Connections and calling mark_down() + * on each. + * + * This will generate a RESET event for each closed connections. + */ + virtual void mark_down_all() = 0; /** * Unlike mark_down, this function will try and deliver * all messages before ending the connection, and it will use @@ -529,12 +550,6 @@ class Messenger { * @param con The Connection to mark as disposable. */ virtual void mark_disposable(Connection *con) = 0; - /** - * Mark all the existing Connections down. This is equivalent - * to iterating over all Connections and calling mark_down() - * on each. - */ - virtual void mark_down_all() = 0; /** * @} // Connection Management */ diff --git a/src/msg/Pipe.cc b/src/msg/Pipe.cc index 6714b9f7b6e2b..6f271c812f386 100644 --- a/src/msg/Pipe.cc +++ b/src/msg/Pipe.cc @@ -210,12 +210,11 @@ void *Pipe::DelayedDelivery::entry() int Pipe::accept() { ldout(msgr->cct,10) << "accept" << dendl; + assert(pipe_lock.is_locked()); + assert(state == STATE_ACCEPTING); - set_socket_options(); + pipe_lock.Unlock(); - // my creater gave me sd via accept() - assert(state == STATE_ACCEPTING); - // vars bufferlist addrs; entity_addr_t socket_addr; @@ -232,6 +231,7 @@ int Pipe::accept() uint64_t feat_missing; bool replaced = false; CryptoKey session_key; + int removed; // single-use down below // this should roughly mirror pseudocode at // http://ceph.newdream.net/wiki/Messaging_protocol @@ -241,6 +241,8 @@ int Pipe::accept() // used for reading in the remote acked seq on connect uint64_t newly_acked_seq = 0; + set_socket_options(); + // announce myself. r = tcp_write(CEPH_BANNER, strlen(CEPH_BANNER)); if (r < 0) { @@ -311,6 +313,8 @@ int Pipe::accept() goto fail_unlocked; } + // sanitize features + connect.features = ceph_sanitize_features(connect.features); authorizer.clear(); if (connect.authorizer_len) { @@ -328,25 +332,32 @@ int Pipe::accept() << dendl; msgr->lock.Lock(); // FIXME + pipe_lock.Lock(); if (msgr->dispatch_queue.stop) goto shutting_down; + if (state != STATE_ACCEPTING) { + goto shutting_down; + } // note peer's type, flags set_peer_type(connect.host_type); policy = msgr->get_policy(connect.host_type); ldout(msgr->cct,10) << "accept of host_type " << connect.host_type - << ", policy.lossy=" << policy.lossy - << dendl; + << ", policy.lossy=" << policy.lossy + << " policy.server=" << policy.server + << " policy.standby=" << policy.standby + << " policy.resetcheck=" << policy.resetcheck + << dendl; memset(&reply, 0, sizeof(reply)); reply.protocol_version = msgr->get_proto_version(peer_type, false); + msgr->lock.Unlock(); // mismatch? ldout(msgr->cct,10) << "accept my proto " << reply.protocol_version << ", their proto " << connect.protocol_version << dendl; if (connect.protocol_version != reply.protocol_version) { reply.tag = CEPH_MSGR_TAG_BADPROTOVER; - msgr->lock.Unlock(); goto reply; } @@ -372,18 +383,20 @@ int Pipe::accept() if (feat_missing) { ldout(msgr->cct,1) << "peer missing required features " << std::hex << feat_missing << std::dec << dendl; reply.tag = CEPH_MSGR_TAG_FEATURES; - msgr->lock.Unlock(); goto reply; } - msgr->lock.Unlock(); - // Check the authorizer. If not good, bail out. + pipe_lock.Unlock(); + if (!msgr->verify_authorizer(connection_state.get(), peer_type, connect.authorizer_protocol, authorizer, authorizer_reply, authorizer_valid, session_key) || !authorizer_valid) { ldout(msgr->cct,0) << "accept: got bad authorizer" << dendl; + pipe_lock.Lock(); + if (state != STATE_ACCEPTING) + goto shutting_down_msgr_unlocked; reply.tag = CEPH_MSGR_TAG_BADAUTHORIZER; delete session_security; session_security = NULL; @@ -395,14 +408,16 @@ int Pipe::accept() ldout(msgr->cct,10) << "accept: setting up session_security." << dendl; msgr->lock.Lock(); + pipe_lock.Lock(); if (msgr->dispatch_queue.stop) goto shutting_down; - + if (state != STATE_ACCEPTING) + goto shutting_down; // existing? existing = msgr->_lookup_pipe(peer_addr); if (existing) { - existing->pipe_lock.Lock(); + existing->pipe_lock.Lock(true); // skip lockdep check (we are locking a second Pipe here) if (connect.global_seq < existing->peer_global_seq) { ldout(msgr->cct,10) << "accept existing " << existing << ".gseq " << existing->peer_global_seq @@ -526,6 +541,8 @@ int Pipe::accept() assert(0); retry_session: + assert(existing->pipe_lock.is_locked()); + assert(pipe_lock.is_locked()); reply.tag = CEPH_MSGR_TAG_RETRY_SESSION; reply.connect_seq = existing->connect_seq + 1; existing->pipe_lock.Unlock(); @@ -533,8 +550,10 @@ int Pipe::accept() goto reply; reply: + assert(pipe_lock.is_locked()); reply.features = ((uint64_t)connect.features & policy.features_supported) | policy.features_required; reply.authorizer_len = authorizer_reply.length(); + pipe_lock.Unlock(); r = tcp_write((char*)&reply, sizeof(reply)); if (r < 0) goto fail_unlocked; @@ -546,6 +565,8 @@ int Pipe::accept() } replace: + assert(existing->pipe_lock.is_locked()); + assert(pipe_lock.is_locked()); if (connect.features & CEPH_FEATURE_RECONNECT_SEQ) { reply_tag = CEPH_MSGR_TAG_SEQ; existing_seq = existing->in_seq; @@ -596,8 +617,10 @@ int Pipe::accept() open: // open + assert(pipe_lock.is_locked()); connect_seq = connect.connect_seq + 1; peer_global_seq = connect.global_seq; + assert(state == STATE_ACCEPTING); state = STATE_OPEN; ldout(msgr->cct,10) << "accept success, connect_seq = " << connect_seq << ", sending READY" << dendl; @@ -624,8 +647,11 @@ int Pipe::accept() // ok! if (msgr->dispatch_queue.stop) goto shutting_down; + removed = msgr->accepting_pipes.erase(this); + assert(removed == 1); register_pipe(); msgr->lock.Unlock(); + pipe_lock.Unlock(); r = tcp_write((char*)&reply, sizeof(reply)); if (r < 0) { @@ -657,7 +683,6 @@ int Pipe::accept() start_writer(); } ldout(msgr->cct,20) << "accept done" << dendl; - pipe_lock.Unlock(); maybe_start_delay_thread(); @@ -690,11 +715,12 @@ int Pipe::accept() if (queued || replaced) start_writer(); } - pipe_lock.Unlock(); return -1; shutting_down: msgr->lock.Unlock(); + shutting_down_msgr_unlocked: + assert(pipe_lock.is_locked()); if (msgr->cct->_conf->ms_inject_internal_delays) { ldout(msgr->cct, 10) << " sleep for " << msgr->cct->_conf->ms_inject_internal_delays << dendl; @@ -703,11 +729,9 @@ int Pipe::accept() t.sleep(); } - pipe_lock.Lock(); state = STATE_CLOSED; state_closed.set(1); fault(); - pipe_lock.Unlock(); return -1; } @@ -899,12 +923,17 @@ int Pipe::connect() ldout(msgr->cct,2) << "connect read reply " << strerror_r(errno, buf, sizeof(buf)) << dendl; goto fail; } + + // sanitize features + reply.features = ceph_sanitize_features(reply.features); + ldout(msgr->cct,20) << "connect got reply tag " << (int)reply.tag - << " connect_seq " << reply.connect_seq - << " global_seq " << reply.global_seq - << " proto " << reply.protocol_version - << " flags " << (int)reply.flags - << dendl; + << " connect_seq " << reply.connect_seq + << " global_seq " << reply.global_seq + << " proto " << reply.protocol_version + << " flags " << (int)reply.flags + << " features " << reply.features + << dendl; authorizer_reply.clear(); @@ -1007,6 +1036,8 @@ int Pipe::connect() ldout(msgr->cct,2) << "connect read error on newly_acked_seq" << dendl; goto fail_locked; } + ldout(msgr->cct,2) << " got newly_acked_seq " << newly_acked_seq + << " vs out_seq " << out_seq << dendl; while (newly_acked_seq > out_seq) { Message *m = _get_next_outgoing(); assert(m); @@ -1101,6 +1132,7 @@ void Pipe::unregister_pipe() msgr->rank_pipe.erase(p); } else { ldout(msgr->cct,10) << "unregister_pipe - not registered" << dendl; + msgr->accepting_pipes.erase(this); // somewhat overkill, but safe. } } @@ -1124,6 +1156,8 @@ void Pipe::requeue_sent() void Pipe::discard_requeued_up_to(uint64_t seq) { ldout(msgr->cct, 10) << "discard_requeued_up_to " << seq << dendl; + if (out_q.count(CEPH_MSG_PRIO_HIGHEST) == 0) + return; list& rq = out_q[CEPH_MSG_PRIO_HIGHEST]; while (!rq.empty()) { Message *m = rq.front(); @@ -1135,6 +1169,8 @@ void Pipe::discard_requeued_up_to(uint64_t seq) rq.pop_front(); out_seq++; } + if (rq.empty()) + out_q.erase(CEPH_MSG_PRIO_HIGHEST); } /* @@ -1305,11 +1341,13 @@ void Pipe::stop() */ void Pipe::reader() { - if (state == STATE_ACCEPTING) - accept(); - pipe_lock.Lock(); + if (state == STATE_ACCEPTING) { + accept(); + assert(pipe_lock.is_locked()); + } + // loop. while (state != STATE_CLOSED && state != STATE_CONNECTING) { @@ -1527,10 +1565,16 @@ void Pipe::writer() // associate message with Connection (for benefit of encode_payload) m->set_connection(connection_state.get()); - ldout(msgr->cct,20) << "writer encoding " << m->get_seq() << " " << m << " " << *m << dendl; + uint64_t features = connection_state->get_features(); + if (m->empty_payload()) + ldout(msgr->cct,20) << "writer encoding " << m->get_seq() << " features " << features + << " " << m << " " << *m << dendl; + else + ldout(msgr->cct,20) << "writer half-reencoding " << m->get_seq() << " features " << features + << " " << m << " " << *m << dendl; // encode and copy out of *m - m->encode(connection_state->get_features(), !msgr->cct->_conf->ms_nocrc); + m->encode(features, !msgr->cct->_conf->ms_nocrc); // prepare everything ceph_msg_header& header = m->get_header(); diff --git a/src/msg/Pipe.h b/src/msg/Pipe.h index b359bc2caf72f..5f94305350c3e 100644 --- a/src/msg/Pipe.h +++ b/src/msg/Pipe.h @@ -247,14 +247,17 @@ class DispatchQueue; void stop(); void _send(Message *m) { + assert(pipe_lock.is_locked()); out_q[m->get_priority()].push_back(m); cond.Signal(); } void _send_keepalive() { + assert(pipe_lock.is_locked()); keepalive = true; cond.Signal(); } Message *_get_next_outgoing() { + assert(pipe_lock.is_locked()); Message *m = 0; while (!m && !out_q.empty()) { map >::reverse_iterator p = out_q.rbegin(); diff --git a/src/msg/SimpleMessenger.cc b/src/msg/SimpleMessenger.cc index 48e37d8709840..441ed432af00b 100644 --- a/src/msg/SimpleMessenger.cc +++ b/src/msg/SimpleMessenger.cc @@ -276,9 +276,10 @@ int SimpleMessenger::bind(const entity_addr_t &bind_addr) int SimpleMessenger::rebind(const set& avoid_ports) { ldout(cct,1) << "rebind avoid " << avoid_ports << dendl; - mark_down_all(); assert(did_bind); - return accepter.rebind(avoid_ports); + int r = accepter.rebind(avoid_ports); + mark_down_all(); + return r; } int SimpleMessenger::start() @@ -311,6 +312,7 @@ Pipe *SimpleMessenger::add_accept_pipe(int sd) p->start_reader(); p->pipe_lock.Unlock(); pipes.insert(p); + accepting_pipes.insert(p); lock.Unlock(); return p; } @@ -559,6 +561,18 @@ void SimpleMessenger::mark_down_all() { ldout(cct,1) << "mark_down_all" << dendl; lock.Lock(); + for (set::iterator q = accepting_pipes.begin(); q != accepting_pipes.end(); ++q) { + Pipe *p = *q; + ldout(cct,5) << "mark_down_all accepting_pipe " << p << dendl; + p->pipe_lock.Lock(); + p->stop(); + ConnectionRef con = p->connection_state; + if (con && con->clear_pipe(p)) + dispatch_queue.queue_reset(con.get()); + p->pipe_lock.Unlock(); + } + accepting_pipes.clear(); + while (!rank_pipe.empty()) { hash_map::iterator it = rank_pipe.begin(); Pipe *p = it->second; @@ -585,9 +599,12 @@ void SimpleMessenger::mark_down(const entity_addr_t& addr) p->pipe_lock.Lock(); p->stop(); if (p->connection_state) { - // do not generate a reset event for the caller in this case, - // since they asked for it. - p->connection_state->clear_pipe(p); + // generate a reset event for the caller in this case, even + // though they asked for it, since this is the addr-based (and + // not Connection* based) interface + ConnectionRef con = p->connection_state; + if (con && con->clear_pipe(p)) + dispatch_queue.queue_reset(con.get()); } p->pipe_lock.Unlock(); } else { diff --git a/src/msg/SimpleMessenger.h b/src/msg/SimpleMessenger.h index 47ee145aa5e12..6860c6c21a3a7 100644 --- a/src/msg/SimpleMessenger.h +++ b/src/msg/SimpleMessenger.h @@ -92,28 +92,12 @@ class SimpleMessenger : public Messenger { /** @defgroup Accessors * @{ */ - /** - * Set the IP this SimpleMessenger is using. This is useful if it's unset - * but another SimpleMessenger on the same interface has already learned its - * IP. Of course, this function does not change the port, since the - * SimpleMessenger always knows the correct setting for that. - * If the SimpleMesssenger's IP is already set, this function is a no-op. - * - * @param addr The IP address to set internally. - */ void set_addr_unknowns(entity_addr_t& addr); - /** - * Get the number of Messages which the SimpleMessenger has received - * but not yet dispatched. - * @return The length of the Dispatch queue. - */ + int get_dispatch_queue_len() { return dispatch_queue.get_queue_len(); } - /** - * Get age of oldest undelivered message - * (0 if the queue is empty) - */ + double get_dispatch_queue_max_age(utime_t now) { return dispatch_queue.get_max_age(now); } @@ -123,52 +107,21 @@ class SimpleMessenger : public Messenger { * @defgroup Configuration functions * @{ */ - /** - * Set the cluster protocol in use by this daemon. - * This is an init-time function and cannot be called after calling - * start() or bind(). - * - * @param p The cluster protocol to use. Defined externally. - */ void set_cluster_protocol(int p) { assert(!started && !did_bind); cluster_protocol = p; } - /** - * Set a policy which is applied to all peers who do not have a type-specific - * Policy. - * This is an init-time function and cannot be called after calling - * start() or bind(). - * - * @param p The Policy to apply. - */ + void set_default_policy(Policy p) { Mutex::Locker l(policy_lock); default_policy = p; } - /** - * Set a policy which is applied to all peers of the given type. - * This is an init-time function and cannot be called after calling - * start() or bind(). - * - * @param type The peer type this policy applies to. - * @param p The policy to apply. - */ + void set_policy(int type, Policy p) { Mutex::Locker l(policy_lock); policy_map[type] = p; } - /** - * Set a Throttler which is applied to all Messages from the given - * type of peer. - * This is an init-time function and cannot be called after calling - * start() or bind(). - * - * @param type The peer type this Throttler will apply to. - * @param t The Throttler to apply. SimpleMessenger does not take - * ownership of this pointer, but you must not destroy it before - * you destroy SimpleMessenger. - */ + void set_policy_throttlers(int type, Throttle *byte_throttle, Throttle *msg_throttle) { Mutex::Locker l(policy_lock); if (policy_map.count(type)) { @@ -179,50 +132,18 @@ class SimpleMessenger : public Messenger { default_policy.throttler_messages = msg_throttle; } } - /** - * Bind the SimpleMessenger to a specific address. If bind_addr - * is not completely filled in the system will use the - * valid portions and cycle through the unset ones (eg, the port) - * in an unspecified order. - * - * @param bind_addr The address to bind to. - * @return 0 on success, or -1 if the SimpleMessenger is already running, or - * -errno if an error is returned from a system call. - */ + int bind(const entity_addr_t& bind_addr); - /** - * This function performs a full restart of the SimpleMessenger. It - * calls mark_down_all() and binds to a new port. (If avoid_port - * is set it additionally avoids that specific port.) - * - * @param avoid_port An additional port to avoid binding to. - */ int rebind(const set& avoid_ports); + /** @} Configuration functions */ /** * @defgroup Startup/Shutdown * @{ */ - /** - * Start up the SimpleMessenger. Create worker threads as necessary. - * @return 0 - */ virtual int start(); - /** - * Wait until the SimpleMessenger is ready to shut down (triggered by a - * call to the shutdown() function), then handle - * stopping its threads and cleaning up Pipes and various queues. - * Once this function returns, the SimpleMessenger is fully shut down and - * can be deleted. - */ virtual void wait(); - /** - * Tell the SimpleMessenger to shut down. This function does not - * complete the shutdown; it just triggers it. - * - * @return 0 - */ virtual int shutdown(); /** @} // Startup/Shutdown */ @@ -231,60 +152,18 @@ class SimpleMessenger : public Messenger { * @defgroup Messaging * @{ */ - /** - * Queue the given Message for the given entity. - * Success in this function does not guarantee Message delivery, only - * success in queueing the Message. Other guarantees may be provided based - * on the Connection policy associated with the dest. - * - * @param m The Message to send. The Messenger consumes a single reference - * when you pass it in. - * @param dest The entity to send the Message to. - * - * @return 0 on success, or -EINVAL if the dest's address is empty. - */ virtual int send_message(Message *m, const entity_inst_t& dest) { return _send_message(m, dest, false); } - /** - * Queue the given Message to send out on the given Connection. - * Success in this function does not guarantee Message delivery, only - * success in queueing the Message (or else a guaranteed-safe drop). - * Other guarantees may be provided based on the Connection policy. - * - * @param m The Message to send. The Messenger consumes a single reference - * when you pass it in. - * @param con The Connection to send the Message out on. - * - * @return 0 on success. - */ + virtual int send_message(Message *m, Connection *con) { return _send_message(m, con, false); } - /** - * Lazily queue the given Message for the given entity. Unlike with - * send_message(), lazy_send_message() will not establish a - * Connection if none exists, re-establish the connection if it - * has broken, or queue the Message if the connection is broken. - * - * @param m The Message to send. The Messenger consumes a single reference - * when you pass it in. - * @param dest The entity to send the Message to. - * - * @return 0 on success, or -EINVAL if the dest's address is empty. - */ + virtual int lazy_send_message(Message *m, const entity_inst_t& dest) { return _send_message(m, dest, true); } - /** - * Lazily queue the given Message for the given Connection. - * - * @param m The Message to send. The Messenger consumes a single reference - * when you pass it in. - * @param con The Connection to send the Message out on. - * - * @return 0. - */ + virtual int lazy_send_message(Message *m, Connection *con) { return _send_message(m, con, true); } @@ -294,90 +173,14 @@ class SimpleMessenger : public Messenger { * @defgroup Connection Management * @{ */ - /** - * Get the Connection object associated with a given entity. If a - * Connection does not exist, create one and establish a logical connection. - * The caller owns a reference when this returns. Call ->put() when you're - * done! - * - * @param dest The entity to get a connection for. - * @return The requested Connection, as a pointer whose reference you own. - */ virtual ConnectionRef get_connection(const entity_inst_t& dest); virtual ConnectionRef get_loopback_connection(); - /** - * Send a "keepalive" ping to the given dest, if it has a working Connection. - * If the Messenger doesn't already have a Connection, or if the underlying - * connection has broken, this function does nothing. - * - * @param dest The entity to send the keepalive to. - * @return 0, or -EINVAL if we don't already have a Connection, or - * -EPIPE if a Pipe for the dest doesn't exist. - */ virtual int send_keepalive(const entity_inst_t& addr); - /** - * Send a "keepalive" ping along the given Connection, if it's working. - * If the underlying connection has broken, this function does nothing. - * - * @param dest The entity to send the keepalive to. - * @return 0, or -EPIPE if the Connection doesn't have a running Pipe. - */ virtual int send_keepalive(Connection *con); - /** - * Mark down a Connection to a remote. This will cause us to - * discard our outgoing queue for them, and if they try - * to reconnect they will discard their queue when we - * inform them of the session reset. If there is no - * Connection to the given dest, it is a no-op. - * It does not generate any notifications to the Dispatcher. - * - * @param a The address to mark down. - */ virtual void mark_down(const entity_addr_t& addr); - /** - * Mark down the given Connection. This will cause us to - * discard its outgoing queue, and if the endpoint tries - * to reconnect they will discard their queue when we - * inform them of the session reset. - * It does not generate any notifications to the Dispatcher. - * - * @param con The Connection to mark down. - */ virtual void mark_down(Connection *con); - /** - * Unlike mark_down, this function will try and deliver - * all messages before ending the connection, and it will use - * the Pipe's existing semantics to do so. Once the Messages - * all been sent out (and acked, if using reliable delivery) - * the Connection will be closed. - * This function means that you will get standard delivery to endpoints, - * and then the Connection will be cleaned up. It does not - * generate any notifications to the Dispatcher. - * - * @param con The Connection to mark down. - */ virtual void mark_down_on_empty(Connection *con); - /** - * Mark a Connection as "disposable", setting it to lossy - * (regardless of initial Policy). Unlike mark_down_on_empty() - * this does not immediately close the Connection once - * Messages have been delivered, so as long as there are no errors you can - * continue to receive responses; but it will not attempt - * to reconnect for message delivery or preserve your old - * delivery semantics, either. - * You can compose this with mark_down, in which case the Pipe - * will make sure to send all Messages and wait for an ack before - * closing, but if there's a failure it will simply shut down. It - * does not generate any notifications to the Dispatcher. - * - * @param con The Connection to mark as disposable. - */ virtual void mark_disposable(Connection *con); - /** - * Mark all the existing Connections down. This is equivalent - * to iterating over all Connections and calling mark_down() - * on each. - */ virtual void mark_down_all(); /** @} // Connection Management */ protected: @@ -514,6 +317,12 @@ class SimpleMessenger : public Messenger { * invalid and can be replaced by anyone holding the msgr lock */ hash_map rank_pipe; + /** + * list of pipes are in teh process of accepting + * + * These are not yet in the rank_pipe map. + */ + set accepting_pipes; /// a set of all the Pipes we have which are somehow active set pipes; /// a list of Pipes we want to tear down diff --git a/src/os/FileStore.cc b/src/os/FileStore.cc index 70f182a61e00c..1363eff27d1f7 100644 --- a/src/os/FileStore.cc +++ b/src/os/FileStore.cc @@ -99,6 +99,7 @@ static const __SWORD_TYPE XFS_SUPER_MAGIC(0x58465342); #define CLUSTER_SNAP_ITEM "clustersnap_%s" #define REPLAY_GUARD_XATTR "user.cephos.seq" +#define GLOBAL_REPLAY_GUARD_XATTR "user.cephos.gseq" /* * long file names will have the following format: @@ -467,6 +468,7 @@ FileStore::FileStore(const std::string &base, const std::string &jdev, const cha plb.add_time_avg(l_os_commit_len, "commitcycle_interval"); plb.add_time_avg(l_os_commit_lat, "commitcycle_latency"); plb.add_u64_counter(l_os_j_full, "journal_full"); + plb.add_time_avg(l_os_queue_lat, "queue_transaction_latency_avg"); logger = plb.create_perf_counters(); @@ -1954,6 +1956,7 @@ void FileStore::op_queue_reserve_throttle(Op *o) logger->set(l_os_oq_max_ops, max_ops); logger->set(l_os_oq_max_bytes, max_bytes); + utime_t start = ceph_clock_now(g_ceph_context); { Mutex::Locker l(op_throttle_lock); while ((max_ops && (op_queue_len + 1) > max_ops) || @@ -1967,6 +1970,8 @@ void FileStore::op_queue_reserve_throttle(Op *o) op_queue_len++; op_queue_bytes += o->bytes; } + utime_t end = ceph_clock_now(g_ceph_context); + logger->tinc(l_os_queue_lat, end - start); logger->set(l_os_oq_ops, op_queue_len); logger->set(l_os_oq_bytes, op_queue_bytes); @@ -2178,6 +2183,78 @@ int FileStore::_do_transactions( return r; } +void FileStore::_set_global_replay_guard(coll_t cid, + const SequencerPosition &spos) +{ + if (btrfs_stable_commits) + return; + + // sync all previous operations on this sequencer + sync_filesystem(basedir_fd); + + char fn[PATH_MAX]; + get_cdir(cid, fn, sizeof(fn)); + int fd = ::open(fn, O_RDONLY); + if (fd < 0) { + int err = errno; + derr << __func__ << ": " << cid << " error " << cpp_strerror(err) << dendl; + assert(0 == "_set_global_replay_guard failed"); + } + + _inject_failure(); + + // then record that we did it + bufferlist v; + ::encode(spos, v); + int r = chain_fsetxattr(fd, GLOBAL_REPLAY_GUARD_XATTR, v.c_str(), v.length()); + if (r < 0) { + derr << __func__ << ": fsetxattr " << GLOBAL_REPLAY_GUARD_XATTR + << " got " << cpp_strerror(r) << dendl; + assert(0 == "fsetxattr failed"); + } + + // and make sure our xattr is durable. + ::fsync(fd); + + _inject_failure(); + + TEMP_FAILURE_RETRY(::close(fd)); + dout(10) << __func__ << ": " << spos << " done" << dendl; +} + +int FileStore::_check_global_replay_guard(coll_t cid, + const SequencerPosition& spos) +{ + if (!replaying || btrfs_stable_commits) + return 1; + + char fn[PATH_MAX]; + get_cdir(cid, fn, sizeof(fn)); + int fd = ::open(fn, O_RDONLY); + if (fd < 0) { + dout(10) << __func__ << ": " << cid << " dne" << dendl; + return 1; // if collection does not exist, there is no guard, and we can replay. + } + + char buf[100]; + int r = chain_fgetxattr(fd, GLOBAL_REPLAY_GUARD_XATTR, buf, sizeof(buf)); + if (r < 0) { + dout(20) << __func__ << " no xattr" << dendl; + assert(!m_filestore_fail_eio || r != -EIO); + return 1; // no xattr + } + bufferlist bl; + bl.append(buf, r); + + SequencerPosition opos; + bufferlist::iterator p = bl.begin(); + ::decode(opos, p); + + TEMP_FAILURE_RETRY(::close(fd)); + return spos >= opos ? 1 : -1; +} + + void FileStore::_set_replay_guard(coll_t cid, const SequencerPosition &spos, bool in_progress=false) @@ -2283,8 +2360,12 @@ int FileStore::_check_replay_guard(coll_t cid, hobject_t oid, const SequencerPos if (!replaying || btrfs_stable_commits) return 1; + int r = _check_global_replay_guard(cid, spos); + if (r < 0) + return r; + FDRef fd; - int r = lfn_open(cid, oid, false, &fd); + r = lfn_open(cid, oid, false, &fd); if (r < 0) { dout(10) << "_check_replay_guard " << cid << " " << oid << " dne" << dendl; return 1; // if file does not exist, there is no guard, and we can replay. @@ -4220,6 +4301,9 @@ int FileStore::_collection_rename(const coll_t &cid, const coll_t &ncid, get_cdir(cid, old_coll, sizeof(old_coll)); get_cdir(ncid, new_coll, sizeof(new_coll)); + _set_global_replay_guard(cid, spos); + _set_replay_guard(cid, spos); + if (_check_replay_guard(cid, spos) < 0) { return 0; } @@ -4743,6 +4827,7 @@ int FileStore::_split_collection(coll_t cid, if (srccmp < 0) return 0; + _set_global_replay_guard(cid, spos); _set_replay_guard(cid, spos, true); _set_replay_guard(dest, spos, true); diff --git a/src/os/FileStore.h b/src/os/FileStore.h index 5a2a0b88566f6..86d267dddf1cc 100644 --- a/src/os/FileStore.h +++ b/src/os/FileStore.h @@ -324,6 +324,8 @@ class FileStore : public JournalingObjectStore, void _set_replay_guard(coll_t cid, const SequencerPosition& spos, bool in_progress); + void _set_global_replay_guard(coll_t cid, + const SequencerPosition &spos); /// close a replay guard opened with in_progress=true void _close_replay_guard(int fd, const SequencerPosition& spos); @@ -348,6 +350,7 @@ class FileStore : public JournalingObjectStore, int _check_replay_guard(int fd, const SequencerPosition& spos); int _check_replay_guard(coll_t cid, const SequencerPosition& spos); int _check_replay_guard(coll_t cid, hobject_t oid, const SequencerPosition& pos); + int _check_global_replay_guard(coll_t cid, const SequencerPosition& spos); // ------------------ // objects diff --git a/src/os/ObjectStore.cc b/src/os/ObjectStore.cc index 8426e49901e1e..ae97b6b08d344 100644 --- a/src/os/ObjectStore.cc +++ b/src/os/ObjectStore.cc @@ -422,6 +422,20 @@ void ObjectStore::Transaction::dump(ceph::Formatter *f) } break; + case Transaction::OP_OMAP_RMKEYRANGE: + { + coll_t cid(i.get_cid()); + hobject_t oid = i.get_oid(); + string first, last; + first = i.get_key(); + last = i.get_key(); + f->dump_string("op_name", "op_omap_rmkeyrange"); + f->dump_stream("collection") << cid; + f->dump_stream("oid") << oid; + f->dump_string("first", first); + f->dump_string("last", last); + } + default: f->dump_string("op_name", "unknown"); f->dump_unsigned("op_code", op); diff --git a/src/os/ObjectStore.h b/src/os/ObjectStore.h index 7310d53b0eeda..6bfefa09a470a 100644 --- a/src/os/ObjectStore.h +++ b/src/os/ObjectStore.h @@ -67,6 +67,7 @@ enum { l_os_commit_len, l_os_commit_lat, l_os_j_full, + l_os_queue_lat, l_os_last, }; diff --git a/src/osd/OSD.cc b/src/osd/OSD.cc index f2cb90756642b..1ee4c09a63e1f 100644 --- a/src/osd/OSD.cc +++ b/src/osd/OSD.cc @@ -918,7 +918,7 @@ OSD::OSD(int id, Messenger *internal_messenger, Messenger *external_messenger, finished_lock("OSD::finished_lock"), test_ops_hook(NULL), op_wq(this, g_conf->osd_op_thread_timeout, &op_tp), - peering_wq(this, g_conf->osd_op_thread_timeout, &op_tp, 200), + peering_wq(this, g_conf->osd_op_thread_timeout, &op_tp), map_lock("OSD::map_lock"), peer_map_epoch_lock("OSD::peer_map_epoch_lock"), debug_drop_pg_create_probability(g_conf->osd_debug_drop_pg_create_probability), @@ -1400,6 +1400,9 @@ void OSD::create_logger() osd_plb.add_u64_counter(l_osd_map, "map_messages"); // osdmap messages osd_plb.add_u64_counter(l_osd_mape, "map_message_epochs"); // osdmap epochs osd_plb.add_u64_counter(l_osd_mape_dup, "map_message_epoch_dups"); // dup osdmap epochs + osd_plb.add_u64_counter(l_osd_waiting_for_map, + "messages_delayed_for_map"); // dup osdmap epochs + osd_plb.add_time_avg(l_osd_peering_latency, "peering_latency"); logger = osd_plb.create_perf_counters(); g_ceph_context->get_perfcounters_collection()->add(logger); @@ -1773,7 +1776,7 @@ OSD::res_result OSD::_try_resurrect_pg( DeletingStateRef df; pg_t cur(pgid); while (true) { - df = service.deleting_pgs.lookup(pgid); + df = service.deleting_pgs.lookup(cur); if (df) break; if (!cur.ps()) @@ -1810,7 +1813,7 @@ OSD::res_result OSD::_try_resurrect_pg( << dendl; *resurrected = cur; *old_pg_state = df->old_pg_state; - service.deleting_pgs.remove(pgid); // PG is no longer being removed! + service.deleting_pgs.remove(cur); // PG is no longer being removed! return RES_PARENT; } else { /* this is not a problem, failing to cancel proves that all objects @@ -2500,6 +2503,8 @@ void OSD::update_osd_stat() service.check_nearfull_warning(osd_stat); + op_tracker.get_age_ms_histogram(&osd_stat.op_queue_age_hist); + dout(20) << "update_osd_stat " << osd_stat << dendl; } @@ -3366,8 +3371,8 @@ void OSD::ms_handle_connect(Connection *con) bool OSD::ms_handle_reset(Connection *con) { - dout(1) << "OSD::ms_handle_reset()" << dendl; OSD::Session *session = (OSD::Session *)con->get_priv(); + dout(1) << "ms_handle_reset con " << con << " session " << session << dendl; if (!session) return false; session->wstate.reset(); @@ -4764,6 +4769,7 @@ void OSD::wait_for_new_map(OpRequestRef op) monc->renew_subs(); } + logger->inc(l_osd_waiting_for_map); waiting_for_osdmap.push_back(op); op->mark_delayed("wait for new map"); } @@ -5565,8 +5571,13 @@ bool OSD::require_same_or_newer_map(OpRequestRef op, epoch_t epoch) int from = m->get_source().num(); if (!osdmap->have_inst(from) || osdmap->get_cluster_addr(from) != m->get_source_inst().addr) { - dout(10) << "from dead osd." << from << ", marking down" << dendl; - cluster_messenger->mark_down(m->get_connection()); + dout(5) << "from dead osd." << from << ", marking down, " + << " msg was " << m->get_source_inst().addr + << " expected " << (osdmap->have_inst(from) ? osdmap->get_cluster_addr(from) : entity_addr_t()) + << dendl; + ConnectionRef con = m->get_connection(); + con->set_priv(NULL); // break ref <-> session cycle, if any + cluster_messenger->mark_down(con.get()); return false; } } @@ -5829,7 +5840,7 @@ bool OSD::compat_must_dispatch_immediately(PG *pg) continue; ConnectionRef conn = service.get_con_osd_cluster(*i, pg->get_osdmap()->get_epoch()); - if (conn && !(conn->features & CEPH_FEATURE_INDEP_PG_MAP)) { + if (conn && !conn->has_feature(CEPH_FEATURE_INDEP_PG_MAP)) { return true; } } @@ -5883,7 +5894,7 @@ void OSD::do_notifies( if (!con) continue; _share_map_outgoing(it->first, con.get(), curmap); - if ((con->features & CEPH_FEATURE_INDEP_PG_MAP)) { + if (con->has_feature(CEPH_FEATURE_INDEP_PG_MAP)) { dout(7) << "do_notify osd." << it->first << " on " << it->second.size() << " PGs" << dendl; MOSDPGNotify *m = new MOSDPGNotify(curmap->get_epoch(), @@ -5923,7 +5934,7 @@ void OSD::do_queries(map< int, map >& query_map, if (!con) continue; _share_map_outgoing(who, con.get(), curmap); - if ((con->features & CEPH_FEATURE_INDEP_PG_MAP)) { + if (con->has_feature(CEPH_FEATURE_INDEP_PG_MAP)) { dout(7) << "do_queries querying osd." << who << " on " << pit->second.size() << " PGs" << dendl; MOSDPGQuery *m = new MOSDPGQuery(curmap->get_epoch(), pit->second); @@ -5962,7 +5973,7 @@ void OSD::do_infos(map > >& info if (!con) continue; _share_map_outgoing(p->first, con.get(), curmap); - if ((con->features & CEPH_FEATURE_INDEP_PG_MAP)) { + if (con->has_feature(CEPH_FEATURE_INDEP_PG_MAP)) { MOSDPGInfo *m = new MOSDPGInfo(curmap->get_epoch()); m->pg_list = p->second; cluster_messenger->send_message(m, con.get()); diff --git a/src/osd/OSD.h b/src/osd/OSD.h index 5114c99b66a39..238c5b4359486 100644 --- a/src/osd/OSD.h +++ b/src/osd/OSD.h @@ -112,6 +112,8 @@ enum { l_osd_mape, l_osd_mape_dup, + l_osd_waiting_for_map, + l_osd_peering_latency, l_osd_last, }; @@ -901,10 +903,9 @@ class OSD : public Dispatcher, list peering_queue; OSD *osd; set in_use; - const size_t batch_size; - PeeringWQ(OSD *o, time_t ti, ThreadPool *tp, size_t batch_size) + PeeringWQ(OSD *o, time_t ti, ThreadPool *tp) : ThreadPool::BatchWorkQueue( - "OSD::PeeringWQ", ti, ti*10, tp), osd(o), batch_size(batch_size) {} + "OSD::PeeringWQ", ti, ti*10, tp), osd(o) {} void _dequeue(PG *pg) { for (list::iterator i = peering_queue.begin(); @@ -929,7 +930,8 @@ class OSD : public Dispatcher, void _dequeue(list *out) { set got; for (list::iterator i = peering_queue.begin(); - i != peering_queue.end() && out->size() < batch_size; + i != peering_queue.end() && + out->size() < g_conf->osd_peering_wq_batch_size; ) { if (in_use.count(*i)) { ++i; diff --git a/src/osd/OpRequest.cc b/src/osd/OpRequest.cc index ea9beaacda3b0..3b8a8714d9221 100644 --- a/src/osd/OpRequest.cc +++ b/src/osd/OpRequest.cc @@ -183,6 +183,35 @@ bool OpTracker::check_ops_in_flight(std::vector &warning_vector) return warning_vector.size(); } +void OpTracker::get_age_ms_histogram(pow2_hist_t *h) +{ + Mutex::Locker locker(ops_in_flight_lock); + + h->clear(); + + utime_t now = ceph_clock_now(NULL); + unsigned bin = 30; + uint32_t lb = 1 << (bin-1); // lower bound for this bin + int count = 0; + for (xlist::iterator i = ops_in_flight.begin(); !i.end(); ++i) { + utime_t age = now - (*i)->received_time; + uint32_t ms = (long)(age * 1000.0); + if (ms >= lb) { + count++; + continue; + } + if (count) + h->set(bin, count); + while (lb > ms) { + bin--; + lb >>= 1; + } + count = 1; + } + if (count) + h->set(bin, count); +} + void OpRequest::dump(utime_t now, Formatter *f) const { Message *m = request; diff --git a/src/osd/OpRequest.h b/src/osd/OpRequest.h index ca419f34ff88c..47b050b853815 100644 --- a/src/osd/OpRequest.h +++ b/src/osd/OpRequest.h @@ -64,6 +64,8 @@ class OpTracker { void register_inflight_op(xlist::item *i); void unregister_inflight_op(OpRequest *i); + void get_age_ms_histogram(pow2_hist_t *h); + /** * Look for Ops which are too old, and insert warning * strings for each Op that is too old. diff --git a/src/osd/PG.cc b/src/osd/PG.cc index 46c6f39599799..2a07887a2a314 100644 --- a/src/osd/PG.cc +++ b/src/osd/PG.cc @@ -1132,8 +1132,9 @@ void PG::activate(ObjectStore::Transaction& t, dirty_info = true; dirty_big_info = true; // maybe - // clean up stray objects - clean_up_local(t); + // verify that there are no stray objects + if (is_primary()) + check_local(); // find out when we commit tfin.push_back(new C_PG_ActivateCommitted(this, query_epoch)); @@ -3328,7 +3329,7 @@ void PG::scrub(ThreadPool::TPHandle &handle) ConnectionRef con = osd->get_con_osd_cluster(acting[i], get_osdmap()->get_epoch()); if (!con) continue; - if (!(con->features & CEPH_FEATURE_CHUNKY_SCRUB)) { + if (!con->has_feature(CEPH_FEATURE_CHUNKY_SCRUB)) { dout(20) << "OSD " << acting[i] << " does not support chunky scrubs, falling back to classic" << dendl; @@ -5162,11 +5163,6 @@ PG::RecoveryState::Started::Started(my_context ctx) { state_name = "Started"; context< RecoveryMachine >().log_enter(state_name); - PG *pg = context< RecoveryMachine >().pg; - pg->start_flush( - context< RecoveryMachine >().get_cur_transaction(), - context< RecoveryMachine >().get_on_applied_context_list(), - context< RecoveryMachine >().get_on_safe_context_list()); } boost::statechart::result @@ -5429,6 +5425,9 @@ void PG::RecoveryState::Peering::exit() PG *pg = context< RecoveryMachine >().pg; pg->state_clear(PG_STATE_PEERING); pg->clear_probe_targets(); + + utime_t dur = ceph_clock_now(g_ceph_context) - enter_time; + pg->osd->logger->tinc(l_osd_peering_latency, dur); } @@ -5479,7 +5478,7 @@ PG::RecoveryState::WaitRemoteBackfillReserved::WaitRemoteBackfillReserved(my_con ConnectionRef con = pg->osd->get_con_osd_cluster( pg->backfill_target, pg->get_osdmap()->get_epoch()); if (con) { - if ((con->features & CEPH_FEATURE_BACKFILL_RESERVATION)) { + if (con->has_feature(CEPH_FEATURE_BACKFILL_RESERVATION)) { unsigned priority = pg->is_degraded() ? OSDService::BACKFILL_HIGH : OSDService::BACKFILL_LOW; pg->osd->send_message_osd_cluster( @@ -5735,7 +5734,7 @@ PG::RecoveryState::WaitRemoteRecoveryReserved::react(const RemoteRecoveryReserve if (acting_osd_it != context< Active >().sorted_acting_set.end()) { ConnectionRef con = pg->osd->get_con_osd_cluster(*acting_osd_it, pg->get_osdmap()->get_epoch()); if (con) { - if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) { + if (con->has_feature(CEPH_FEATURE_RECOVERY_RESERVATION)) { pg->osd->send_message_osd_cluster( new MRecoveryReserve(MRecoveryReserve::REQUEST, pg->info.pgid, @@ -5783,7 +5782,7 @@ void PG::RecoveryState::Recovering::release_reservations() continue; ConnectionRef con = pg->osd->get_con_osd_cluster(*i, pg->get_osdmap()->get_epoch()); if (con) { - if ((con->features & CEPH_FEATURE_RECOVERY_RESERVATION)) { + if (con->has_feature(CEPH_FEATURE_RECOVERY_RESERVATION)) { pg->osd->send_message_osd_cluster( new MRecoveryReserve(MRecoveryReserve::RELEASE, pg->info.pgid, @@ -6130,6 +6129,12 @@ PG::RecoveryState::ReplicaActive::ReplicaActive(my_context ctx) state_name = "Started/ReplicaActive"; context< RecoveryMachine >().log_enter(state_name); + + PG *pg = context< RecoveryMachine >().pg; + pg->start_flush( + context< RecoveryMachine >().get_cur_transaction(), + context< RecoveryMachine >().get_on_applied_context_list(), + context< RecoveryMachine >().get_on_safe_context_list()); } @@ -6218,6 +6223,11 @@ PG::RecoveryState::Stray::Stray(my_context ctx) assert(!pg->is_active()); assert(!pg->is_peering()); assert(!pg->is_primary()); + if (!pg->is_replica()) // stray, need to flush for pulls + pg->start_flush( + context< RecoveryMachine >().get_cur_transaction(), + context< RecoveryMachine >().get_on_applied_context_list(), + context< RecoveryMachine >().get_on_safe_context_list()); } boost::statechart::result PG::RecoveryState::Stray::react(const MLogRec& logevt) @@ -6564,6 +6574,10 @@ boost::statechart::result PG::RecoveryState::GetLog::react(const GotLog&) msg->info, msg->log, msg->missing, newest_update_osd); } + pg->start_flush( + context< RecoveryMachine >().get_cur_transaction(), + context< RecoveryMachine >().get_on_applied_context_list(), + context< RecoveryMachine >().get_on_safe_context_list()); return transit< GetMissing >(); } diff --git a/src/osd/PG.h b/src/osd/PG.h index 2239e6b127585..e1f64dbc9fb6c 100644 --- a/src/osd/PG.h +++ b/src/osd/PG.h @@ -613,7 +613,7 @@ class PG { return pg_log.get_missing().num_missing() - missing_loc.size(); } - virtual void clean_up_local(ObjectStore::Transaction& t) = 0; + virtual void check_local() = 0; virtual int start_recovery_ops(int max, RecoveryCtx *prctx) = 0; diff --git a/src/osd/ReplicatedPG.cc b/src/osd/ReplicatedPG.cc index 918498edc5546..453fdacfb760d 100644 --- a/src/osd/ReplicatedPG.cc +++ b/src/osd/ReplicatedPG.cc @@ -821,6 +821,7 @@ void ReplicatedPG::do_op(OpRequestRef op) if (osd_op.op.op == CEPH_OSD_OP_LIST_SNAPS && m->get_snapid() != CEPH_SNAPDIR) { dout(10) << "LIST_SNAPS with incorrect context" << dendl; + put_object_context(obc); osd->reply_op_error(op, -EINVAL); return; } @@ -7502,16 +7503,19 @@ void ReplicatedPG::scan_range(hobject_t begin, int min, int max, BackfillInterva } -/** clean_up_local - * remove any objects that we're storing but shouldn't. - * as determined by log. +/** check_local + * + * verifies that stray objects have been deleted */ -void ReplicatedPG::clean_up_local(ObjectStore::Transaction& t) +void ReplicatedPG::check_local() { - dout(10) << "clean_up_local" << dendl; + dout(10) << __func__ << dendl; assert(info.last_update >= pg_log.get_tail()); // otherwise we need some help! + if (!g_conf->osd_debug_verify_stray_on_activate) + return; + // just scan the log. set did; for (list::const_reverse_iterator p = pg_log.get_log().log.rbegin(); @@ -7522,11 +7526,17 @@ void ReplicatedPG::clean_up_local(ObjectStore::Transaction& t) did.insert(p->soid); if (p->is_delete()) { - dout(10) << " deleting " << p->soid - << " when " << p->version << dendl; - remove_snap_mapped_object(t, p->soid); + dout(10) << " checking " << p->soid + << " at " << p->version << dendl; + struct stat st; + int r = osd->store->stat(coll, p->soid, &st); + if (r != -ENOENT) { + dout(10) << "Object " << p->soid << " exists, but should have been " + << "deleted" << dendl; + assert(0 == "erroneously present object"); + } } else { - // keep old(+missing) objects, just for kicks. + // ignore old(+missing) objects } } } diff --git a/src/osd/ReplicatedPG.h b/src/osd/ReplicatedPG.h index 4cedf913217f0..8f0cf6ec31c2d 100644 --- a/src/osd/ReplicatedPG.h +++ b/src/osd/ReplicatedPG.h @@ -750,7 +750,7 @@ class ReplicatedPG : public PG { int prepare_transaction(OpContext *ctx); // pg on-disk content - void clean_up_local(ObjectStore::Transaction& t); + void check_local(); void _clear_recovery_state(); diff --git a/src/osd/osd_types.cc b/src/osd/osd_types.cc index 08f720edc4cde..fa3b7ecc45db4 100644 --- a/src/osd/osd_types.cc +++ b/src/osd/osd_types.cc @@ -102,6 +102,40 @@ void object_locator_t::generate_test_instances(list& o) } +// -- pow2_hist_t -- +void pow2_hist_t::dump(Formatter *f) const +{ + f->open_array_section("histogram"); + for (vector::const_iterator p = h.begin(); p != h.end(); ++p) + f->dump_int("count", *p); + f->close_section(); + f->dump_int("upper_bound", upper_bound()); +} + +void pow2_hist_t::encode(bufferlist& bl) const +{ + ENCODE_START(1, 1, bl); + ::encode(h, bl); + ENCODE_FINISH(bl); +} + +void pow2_hist_t::decode(bufferlist::iterator& p) +{ + DECODE_START(1, p); + ::decode(h, p); + DECODE_FINISH(p); +} + +void pow2_hist_t::generate_test_instances(list& ls) +{ + ls.push_back(new pow2_hist_t); + ls.push_back(new pow2_hist_t); + ls.back()->h.push_back(1); + ls.back()->h.push_back(3); + ls.back()->h.push_back(0); + ls.back()->h.push_back(2); +} + // -- osd_stat_t -- void osd_stat_t::dump(Formatter *f) const { @@ -118,11 +152,14 @@ void osd_stat_t::dump(Formatter *f) const f->close_section(); f->dump_int("snap_trim_queue_len", snap_trim_queue_len); f->dump_int("num_snap_trimming", num_snap_trimming); + f->open_object_section("op_queue_age_hist"); + op_queue_age_hist.dump(f); + f->close_section(); } void osd_stat_t::encode(bufferlist &bl) const { - ENCODE_START(2, 2, bl); + ENCODE_START(3, 2, bl); ::encode(kb, bl); ::encode(kb_used, bl); ::encode(kb_avail, bl); @@ -130,12 +167,13 @@ void osd_stat_t::encode(bufferlist &bl) const ::encode(num_snap_trimming, bl); ::encode(hb_in, bl); ::encode(hb_out, bl); + ::encode(op_queue_age_hist, bl); ENCODE_FINISH(bl); } void osd_stat_t::decode(bufferlist::iterator &bl) { - DECODE_START_LEGACY_COMPAT_LEN(2, 2, 2, bl); + DECODE_START_LEGACY_COMPAT_LEN(3, 2, 2, bl); ::decode(kb, bl); ::decode(kb_used, bl); ::decode(kb_avail, bl); @@ -143,6 +181,8 @@ void osd_stat_t::decode(bufferlist::iterator &bl) ::decode(num_snap_trimming, bl); ::decode(hb_in, bl); ::decode(hb_out, bl); + if (struct_v >= 3) + ::decode(op_queue_age_hist, bl); DECODE_FINISH(bl); } diff --git a/src/osd/osd_types.h b/src/osd/osd_types.h index e3f46f9d601e2..ff6d70fa69a4a 100644 --- a/src/osd/osd_types.h +++ b/src/osd/osd_types.h @@ -509,6 +509,65 @@ inline ostream& operator<<(ostream& out, const eversion_t e) { } +/** + * power of 2 histogram + */ +struct pow2_hist_t { + /** + * histogram + * + * bin size is 2^index + * value is count of elements that are <= the current bin but > the previous bin. + */ + vector h; + +private: + /// expand to at least another's size + void _expand_to(unsigned s) { + if (s > h.size()) + h.resize(s, 0); + } + /// drop useless trailing 0's + void _contract() { + unsigned p = h.size(); + while (p > 0 && h[p-1] == 0) + --p; + h.resize(p); + } + +public: + void clear() { + h.clear(); + } + void set(int bin, int32_t v) { + _expand_to(bin + 1); + h[bin] = v; + _contract(); + } + + void add(const pow2_hist_t& o) { + _expand_to(o.h.size()); + for (unsigned p = 0; p < o.h.size(); ++p) + h[p] += o.h[p]; + _contract(); + } + void sub(const pow2_hist_t& o) { + _expand_to(o.h.size()); + for (unsigned p = 0; p < o.h.size(); ++p) + h[p] -= o.h[p]; + _contract(); + } + + int32_t upper_bound() const { + return 1 << h.size(); + } + + void dump(Formatter *f) const; + void encode(bufferlist &bl) const; + void decode(bufferlist::iterator &bl); + static void generate_test_instances(std::list& o); +}; +WRITE_CLASS_ENCODER(pow2_hist_t) /** osd_stat * aggregate stats for an osd @@ -518,6 +577,8 @@ struct osd_stat_t { vector hb_in, hb_out; int32_t snap_trim_queue_len, num_snap_trimming; + pow2_hist_t op_queue_age_hist; + osd_stat_t() : kb(0), kb_used(0), kb_avail(0), snap_trim_queue_len(0), num_snap_trimming(0) {} @@ -527,6 +588,7 @@ struct osd_stat_t { kb_avail += o.kb_avail; snap_trim_queue_len += o.snap_trim_queue_len; num_snap_trimming += o.num_snap_trimming; + op_queue_age_hist.add(o.op_queue_age_hist); } void sub(const osd_stat_t& o) { kb -= o.kb; @@ -534,6 +596,7 @@ struct osd_stat_t { kb_avail -= o.kb_avail; snap_trim_queue_len -= o.snap_trim_queue_len; num_snap_trimming -= o.num_snap_trimming; + op_queue_age_hist.sub(o.op_queue_age_hist); } void dump(Formatter *f) const; @@ -562,7 +625,9 @@ inline ostream& operator<<(ostream& out, const osd_stat_t& s) { return out << "osd_stat(" << kb_t(s.kb_used) << " used, " << kb_t(s.kb_avail) << " avail, " << kb_t(s.kb) << " total, " - << "peers " << s.hb_in << "/" << s.hb_out << ")"; + << "peers " << s.hb_in << "/" << s.hb_out + << " op hist " << s.op_queue_age_hist.h + << ")"; } diff --git a/src/pybind/ceph_argparse.py b/src/pybind/ceph_argparse.py index e99f72d7f0c21..73d1115f6456e 100644 --- a/src/pybind/ceph_argparse.py +++ b/src/pybind/ceph_argparse.py @@ -15,6 +15,7 @@ import copy import json import os +import re import socket import stat import sys @@ -175,21 +176,31 @@ def __str__(self): class CephString(CephArgtype): """ - String; pretty generic. + String; pretty generic. goodchars is a RE char class of valid chars """ - def __init__(self, badchars=''): - self.badchars = badchars + def __init__(self, goodchars=''): + from string import printable + try: + re.compile(goodchars) + except: + raise ValueError('CephString(): "{0}" is not a valid RE'.\ + format(goodchars)) + self.goodchars = goodchars + self.goodset = frozenset( + [c for c in printable if re.match(goodchars, c)] + ) def valid(self, s, partial=False): - for c in self.badchars: - if c in s: - raise ArgumentFormat("bad char {0} in {1}".format(c, s)) + sset = set(s) + if self.goodset and not sset <= self.goodset: + raise ArgumentFormat("invalid chars {0} in {1}".\ + format(''.join(sset - self.goodset), s)) self.val = s def __str__(self): b = '' - if len(self.badchars): - b = '(without chars in {0})'.format(self.badchars) + if self.goodchars: + b += '(goodchars {0})'.format(self.goodchars) return ''.format(b) class CephSocketpath(CephArgtype): @@ -794,7 +805,7 @@ def validate(args, signature, partial=False): # hm, but it was required, so quit if partial: return d - raise ArgumentFormat('{0} not valid argument {1}: {2}'.format(str(myarg), desc, e)) + raise e # valid arg acquired. Store in dict, as a list if multivalued if desc.N: @@ -852,19 +863,19 @@ def validate_command(parsed_args, sigdict, args, verbose=False): sig = cmd['sig'] helptext = cmd['help'] try: - valid_dict = validate(args, sig, verbose) + valid_dict = validate(args, sig) found = cmd break + except ArgumentPrefix: + # ignore prefix mismatches; we just haven't found + # the right command yet + pass except ArgumentError as e: # prefixes matched, but some other arg didn't; - # this is interesting information if verbose - if verbose: - print >> sys.stderr, '{0}: invalid command'.\ - format(' '.join(args)) - print >> sys.stderr, '{0}'.format(e) - print >> sys.stderr, "did you mean {0}?\n\t{1}".\ - format(concise_sig(sig), helptext) - pass + # stop now, because we have the right command but + # some other input is invalid + print >> sys.stderr, "Invalid command: ", str(e) + return {} if found: break @@ -878,9 +889,6 @@ def validate_command(parsed_args, sigdict, args, verbose=False): if parsed_args.output_format: valid_dict['format'] = parsed_args.output_format - if parsed_args.threshold: - valid_dict['threshold'] = parsed_args.threshold - return valid_dict def send_command(cluster, target=('mon', ''), cmd=None, inbuf='', timeout=0, diff --git a/src/rgw/rgw_admin.cc b/src/rgw/rgw_admin.cc index 0b7cb143400da..abf9e26c8e3f8 100644 --- a/src/rgw/rgw_admin.cc +++ b/src/rgw/rgw_admin.cc @@ -126,6 +126,8 @@ void _usage() cerr << " mdlog trim\n"; cerr << " replica mdlog get/delete\n"; cerr << " replica datalog get/delete\n"; + cerr << " --rgw-region= region in which radosgw is running\n"; + cerr << " --rgw-zone= zone in which radosgw is running\n"; cerr << " --fix besides checking bucket index, will also fix it\n"; cerr << " --check-objects bucket check: rebuilds bucket index according to\n"; cerr << " actual objects state\n"; diff --git a/src/rgw/rgw_bucket.cc b/src/rgw/rgw_bucket.cc index 23354233c963e..aae7d31e21ccc 100644 --- a/src/rgw/rgw_bucket.cc +++ b/src/rgw/rgw_bucket.cc @@ -1476,8 +1476,13 @@ class RGWBucketMetadataHandler : public RGWMetadataHandler { int ret = store->list_raw_objects(store->zone.domain_root, no_filter, max, info->ctx, unfiltered_keys, truncated); - if (ret < 0) + if (ret < 0 && ret != -ENOENT) return ret; + if (ret == -ENOENT) { + if (truncated) + *truncated = false; + return 0; + } // now filter out the system entries list::iterator iter; @@ -1618,8 +1623,13 @@ class RGWBucketInstanceMetadataHandler : public RGWMetadataHandler { int ret = store->list_raw_objects(store->zone.domain_root, no_filter, max, info->ctx, unfiltered_keys, truncated); - if (ret < 0) + if (ret < 0 && ret != -ENOENT) return ret; + if (ret == -ENOENT) { + if (truncated) + *truncated = false; + return 0; + } int prefix_size = sizeof(RGW_BUCKET_INSTANCE_MD_PREFIX) - 1; // now filter in the relevant entries diff --git a/src/rgw/rgw_main.cc b/src/rgw/rgw_main.cc index e3c853c19d7f8..514e9e47171cd 100644 --- a/src/rgw/rgw_main.cc +++ b/src/rgw/rgw_main.cc @@ -417,6 +417,8 @@ int usage() { cerr << "usage: radosgw [options...]" << std::endl; cerr << "options:\n"; + cerr << " --rgw-region= region in which radosgw runs\n"; + cerr << " --rgw-zone= zone in which radosgw runs\n"; generic_server_usage(); return 0; } diff --git a/src/rgw/rgw_rados.cc b/src/rgw/rgw_rados.cc index b3e297effa4be..067a1adbabdeb 100644 --- a/src/rgw/rgw_rados.cc +++ b/src/rgw/rgw_rados.cc @@ -645,7 +645,8 @@ int RGWPutObjProcessor_Atomic::write_data(bufferlist& bl, off_t ofs, void **phan return RGWPutObjProcessor_Aio::handle_obj_data(cur_obj, bl, ofs - cur_part_ofs, ofs, phandle); } -int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle) { +int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **phandle) +{ *phandle = NULL; if (extra_data_len) { size_t extra_len = bl.length(); @@ -660,7 +661,6 @@ int RGWPutObjProcessor_Atomic::handle_data(bufferlist& bl, off_t ofs, void **pha if (bl.length() == 0) { return 0; } - ofs = extra_data_bl.length(); } pending_data_bl.claim_append(bl); @@ -1908,11 +1908,29 @@ int RGWRados::select_new_bucket_location(RGWUserInfo& user_info, const string& r } } - /* yay, user is permitted, now just make sure that zone has this rule configured. We're + if (pselected_rule) + *pselected_rule = rule; + + return set_bucket_location_by_rule(rule, bucket_name, bucket); +} + +int RGWRados::set_bucket_location_by_rule(const string& location_rule, const std::string& bucket_name, rgw_bucket& bucket) +{ + bucket.name = bucket_name; + + if (location_rule.empty()) { + /* we can only reach here if we're trying to set a bucket location from a bucket + * created on a different zone, using a legacy / default pool configuration + */ + return select_legacy_bucket_placement(bucket_name, bucket); + } + + /* + * make sure that zone has this rule configured. We're * checking it for the local zone, because that's where this bucket object is going to * reside. */ - map::iterator piter = zone.placement_pools.find(rule); + map::iterator piter = zone.placement_pools.find(location_rule); if (piter == zone.placement_pools.end()) { /* couldn't find, means we cannot really place data for this bucket in this zone */ if ((region_name.empty() && region.is_master) || @@ -1926,22 +1944,6 @@ int RGWRados::select_new_bucket_location(RGWUserInfo& user_info, const string& r } } - if (pselected_rule) - *pselected_rule = rule; - - return set_bucket_location_by_rule(rule, bucket_name, bucket); -} - -int RGWRados::set_bucket_location_by_rule(const string& location_rule, const std::string& bucket_name, rgw_bucket& bucket) -{ - bucket.name = bucket_name; - - map::iterator piter = zone.placement_pools.find(location_rule); - if (piter == zone.placement_pools.end()) { - /* silently ignore, bucket will not reside in this zone */ - return 0; - } - RGWZonePlacementInfo& placement_info = piter->second; bucket.data_pool = placement_info.data_pool; @@ -1953,16 +1955,24 @@ int RGWRados::set_bucket_location_by_rule(const string& location_rule, const std int RGWRados::select_bucket_placement(RGWUserInfo& user_info, const string& region_name, const string& placement_rule, const string& bucket_name, rgw_bucket& bucket, string *pselected_rule) +{ + if (!zone.placement_pools.empty()) { + return select_new_bucket_location(user_info, region_name, placement_rule, bucket_name, bucket, pselected_rule); + } + + if (pselected_rule) + pselected_rule->clear(); + + return select_legacy_bucket_placement(bucket_name, bucket); +} + +int RGWRados::select_legacy_bucket_placement(const string& bucket_name, rgw_bucket& bucket) { bufferlist map_bl; map m; string pool_name; bool write_map = false; - if (!zone.placement_pools.empty()) { - return select_new_bucket_location(user_info, region_name, placement_rule, bucket_name, bucket, pselected_rule); - } - rgw_obj obj(zone.domain_root, avail_pools); int ret = rgw_get_system_obj(this, NULL, zone.domain_root, avail_pools, map_bl, NULL, NULL); @@ -2291,6 +2301,11 @@ int RGWRados::put_obj_meta_impl(void *ctx, rgw_obj& obj, uint64_t size, if (ret < 0) { ldout(cct, 0) << "ERROR: complete_update_index_cancel() returned ret=" << ret << dendl; } + /* we lost in a race, object was already overwritten, we + * should treat it as a success + */ + if (r == -ECANCELED) + r = 0; return r; } diff --git a/src/rgw/rgw_rados.h b/src/rgw/rgw_rados.h index 3e747d2b8fb17..fb1a1756ba8aa 100644 --- a/src/rgw/rgw_rados.h +++ b/src/rgw/rgw_rados.h @@ -973,6 +973,7 @@ class RGWRados virtual int init_bucket_index(rgw_bucket& bucket); int select_bucket_placement(RGWUserInfo& user_info, const string& region_name, const std::string& rule, const std::string& bucket_name, rgw_bucket& bucket, string *pselected_rule); + int select_legacy_bucket_placement(const string& bucket_name, rgw_bucket& bucket); int select_new_bucket_location(RGWUserInfo& user_info, const string& region_name, const string& rule, const std::string& bucket_name, rgw_bucket& bucket, string *pselected_rule); int set_bucket_location_by_rule(const string& location_rule, const std::string& bucket_name, rgw_bucket& bucket); diff --git a/src/test/cli/osdmaptool/clobber.t b/src/test/cli/osdmaptool/clobber.t index 1092bd6dc8893..9bbe4d4ceeb7f 100644 --- a/src/test/cli/osdmaptool/clobber.t +++ b/src/test/cli/osdmaptool/clobber.t @@ -19,9 +19,9 @@ modified \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+ (re) flags - pool 0 'data' rep size 2 min_size 1 crush_ruleset 0 object_hash rjenkins pg_num 192 pgp_num 192 last_change 0 owner 0 flags 1 crash_replay_interval 45 - pool 1 'metadata' rep size 2 min_size 1 crush_ruleset 1 object_hash rjenkins pg_num 192 pgp_num 192 last_change 0 owner 0 flags 1 - pool 2 'rbd' rep size 2 min_size 1 crush_ruleset 2 object_hash rjenkins pg_num 192 pgp_num 192 last_change 0 owner 0 flags 1 + pool 0 'data' rep size 2 min_size 1 crush_ruleset 0 object_hash rjenkins pg_num 192 pgp_num 192 last_change 0 owner 0 crash_replay_interval 45 + pool 1 'metadata' rep size 2 min_size 1 crush_ruleset 1 object_hash rjenkins pg_num 192 pgp_num 192 last_change 0 owner 0 + pool 2 'rbd' rep size 2 min_size 1 crush_ruleset 2 object_hash rjenkins pg_num 192 pgp_num 192 last_change 0 owner 0 max_osd 3 @@ -41,9 +41,9 @@ modified \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+ (re) flags - pool 0 'data' rep size 2 min_size 1 crush_ruleset 0 object_hash rjenkins pg_num 64 pgp_num 64 last_change 0 owner 0 flags 1 crash_replay_interval 45 - pool 1 'metadata' rep size 2 min_size 1 crush_ruleset 1 object_hash rjenkins pg_num 64 pgp_num 64 last_change 0 owner 0 flags 1 - pool 2 'rbd' rep size 2 min_size 1 crush_ruleset 2 object_hash rjenkins pg_num 64 pgp_num 64 last_change 0 owner 0 flags 1 + pool 0 'data' rep size 2 min_size 1 crush_ruleset 0 object_hash rjenkins pg_num 64 pgp_num 64 last_change 0 owner 0 crash_replay_interval 45 + pool 1 'metadata' rep size 2 min_size 1 crush_ruleset 1 object_hash rjenkins pg_num 64 pgp_num 64 last_change 0 owner 0 + pool 2 'rbd' rep size 2 min_size 1 crush_ruleset 2 object_hash rjenkins pg_num 64 pgp_num 64 last_change 0 owner 0 max_osd 1 diff --git a/src/test/cli/osdmaptool/create-print.t b/src/test/cli/osdmaptool/create-print.t index b312d3c807a4a..81b9194735936 100644 --- a/src/test/cli/osdmaptool/create-print.t +++ b/src/test/cli/osdmaptool/create-print.t @@ -10,9 +10,9 @@ modified \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+ (re) flags - pool 0 'data' rep size 2 min_size 1 crush_ruleset 0 object_hash rjenkins pg_num 192 pgp_num 192 last_change 0 owner 0 flags 1 crash_replay_interval 45 - pool 1 'metadata' rep size 2 min_size 1 crush_ruleset 1 object_hash rjenkins pg_num 192 pgp_num 192 last_change 0 owner 0 flags 1 - pool 2 'rbd' rep size 2 min_size 1 crush_ruleset 2 object_hash rjenkins pg_num 192 pgp_num 192 last_change 0 owner 0 flags 1 + pool 0 'data' rep size 2 min_size 1 crush_ruleset 0 object_hash rjenkins pg_num 192 pgp_num 192 last_change 0 owner 0 crash_replay_interval 45 + pool 1 'metadata' rep size 2 min_size 1 crush_ruleset 1 object_hash rjenkins pg_num 192 pgp_num 192 last_change 0 owner 0 + pool 2 'rbd' rep size 2 min_size 1 crush_ruleset 2 object_hash rjenkins pg_num 192 pgp_num 192 last_change 0 owner 0 max_osd 3 diff --git a/src/test/cli/radosgw-admin/help.t b/src/test/cli/radosgw-admin/help.t index ba77e7b86c2d9..21f51e68c358e 100644 --- a/src/test/cli/radosgw-admin/help.t +++ b/src/test/cli/radosgw-admin/help.t @@ -87,6 +87,8 @@ mdlog trim replica mdlog get/delete replica datalog get/delete + --rgw-region= region in which radosgw is running + --rgw-zone= zone in which radosgw is running --fix besides checking bucket index, will also fix it --check-objects bucket check: rebuilds bucket index according to actual objects state diff --git a/src/test/cls_lock/test_cls_lock.cc b/src/test/cls_lock/test_cls_lock.cc index 39d3cebde105c..ead03bb218354 100644 --- a/src/test/cls_lock/test_cls_lock.cc +++ b/src/test/cls_lock/test_cls_lock.cc @@ -16,6 +16,7 @@ #include #include "include/types.h" +#include "common/Clock.h" #include "msg/msg_types.h" #include "include/rados/librados.hpp" @@ -280,11 +281,19 @@ TEST(ClsLock, TestLockDuration) { string oid = "foo"; Lock l("lock"); - l.set_duration(utime_t(5, 0)); + utime_t dur(5, 0); + l.set_duration(dur); + utime_t start = ceph_clock_now(NULL); ASSERT_EQ(0, l.lock_exclusive(&ioctx, oid)); - ASSERT_EQ(-EEXIST, l.lock_exclusive(&ioctx, oid)); + int r = l.lock_exclusive(&ioctx, oid); + if (r == 0) { + // it's possible to get success if we were just really slow... + ASSERT_TRUE(ceph_clock_now(NULL) > start + dur); + } else { + ASSERT_EQ(-EEXIST, r); + } - sleep(5); + sleep(dur.sec()); ASSERT_EQ(0, l.lock_exclusive(&ioctx, oid)); ASSERT_EQ(0, destroy_one_pool_pp(pool_name, cluster)); diff --git a/src/test/encoding/types.h b/src/test/encoding/types.h index fd10d28f11952..e0bc0a149a8d5 100644 --- a/src/test/encoding/types.h +++ b/src/test/encoding/types.h @@ -34,6 +34,7 @@ TYPE(osd_reqid_t) TYPE(object_locator_t) TYPE(pg_t) TYPE(coll_t) +TYPE(pow2_hist_t) TYPE(osd_stat_t) TYPE(OSDSuperblock) TYPE_FEATUREFUL(pool_snap_info_t)