- 关于乱序(shuffle)与随机采样(sample)的一点探究 - xybaby - 博客园
Fisher–Yates shuffle - Wikipedia
- Knuth-Durstenfeld Shuffle 是一个“原地”(in-place)算法
- 伪代码
To shuffle an array a of n elements (indices 0..n-1):
for i from n−1 downto 1 do j ← random integer such that 0 ≤ j ≤ i exchange(a[i], a[j])
- Python 实现
def Knuth_Durstenfeld_shuffle(a): """Fisher–Yates Shuffle Args: a(list): """ for i in range(len(a) - 1, 0, -1): j = random.randint(0, i) a[i], a[j] = a[j], a[i] # return a
- 正确性证明:
- 即证明:任何一个元素 shuffle 之后出现在任意位置的概率都是
1/N
- 证明:
第1次:`i == n-1` 在 `[0, n-1]` 这 `n` 个数中随机选取一个数 `j`, 此时每个数**第1次**被抽到的概率为 `1/n` 交换`(a[j], a[n-1])` 该操作表明任何一个元素有 `1/n` 的概率被放在 `n-1` 位置 第2次:`i == n-2` 在 `[0, n-2]` 这 `n-1` 个数中随机选取一个数 `j`, 此时每个数**第2次**被抽到的概率为 `(n-1)/n * 1/(n-1) = 1/n` (其中 `(n-1)/n` 是第1次没被抽到的概率) 交换`(a[j], a[n-2])` 该操作表明任何一个元素有 `1/n` 的概率被放在 `n-2` 位置 第3次:`i == n-3` 在 `[0, n-3]` 这 `n-3` 个数中随机选取一个数 `j`, 此时每个数**第3次**被抽到的概率为 `(n-1)/n * (n-2)/(n-1) * 1/(n-2) = 1/n` (其中 `(n-1)/n` 是第1次没被抽到的概率,`(n-2)/(n-1)` 是第2次没被抽到的概率) 交换`(a[j], a[n-3])` 该操作表明任何一个元素有 `1/n` 的概率被放在 `n-3` 位置 ...
- 即证明:任何一个元素 shuffle 之后出现在任意位置的概率都是
- Inside-Out Shuffle 算法会返回一个打乱的副本
- 伪代码
Input: source for i from 0 to n − 1 do j ← random integer such that 0 ≤ j ≤ i if j ≠ i a[i] ← a[j] a[j] ← source[i]
- Python 实现
def Inside_Out_shuffle(src): """""" a = src[:] # copy for i in range(len(a)): j = random.randint(0, i) if j != i: # 比较的判断可以省略,有时赋值操作比比较更快 a[i] = a[j] a[j] = src[i] return a
# 无比较版本 def Inside_Out_shuffle(src): """""" a = src[:] # copy for i in range(len(a)): j = random.randint(0, i) a[i] = a[j] a[j] = src[i] # src[i] == 被赋值前的 a[i] # 可以看到 Inside_Out_shuffle 本质上就是 Knuth-Durstenfeld Shuffle 的拷贝版本 return a
- 正确性证明 TODO
- 所谓无限,指的是
n=len(src)
未知的情况 - Python 实现
def Inside_Out_shuffle_inf(src): """""" ret = [] while True: try: j = random.randint(0, len(ret)) if j == len(ret): ret.append(next(src)) else: ret.append(ret[j]) ret[j] = next(src) except StopIteration: break return ret
问题描述
从 n 个数中等概率的抽取 m 个数(m < n),每个数被抽取的概率为 `m/n`
- 无放回会改变原始数据,如果不想修改原始数据,需要额外复制一份
思路 1:类似 Knuth-Durstenfeld Shuffle
- 跟洗牌一样,随机从序列中取出一个元素,同时从原序列中删除
- 该方法需要修改原数据
- 如果不能修改原数据,可以考虑复制一份数据(仅当 n 比较小时)
def sample(src): """""" n = len(src) ret = [] for i in range(n): j = random.randrange(n - i) # 0 <= j < n-i ret.append(src[j]) src.pop(j) return ret
- 证明 TODO
思路 2:类似 Inside-Out Shuffle
- Python
random.sample()
中当n <= 4**ceil(log(3*m, 4))
时的采用的方法 - 类似 Inside-Out 算法的方式从原数组中抽取 m 个数
def sample(arr, m): """""" src = list(arr) # 为了防止修改原数据,拷贝一个副本 n = len(src) ret = [None] * m for i in range(m): j = random.randrange(n - i) # 0 <= j < n-i ret[i] = src[j] src[j], src[n - i - 1] = src[n - i - 1], src[j] # 保证每个值都可能被抽到 # src[j] = src[n - i - 1] # 因为使用的是副本,所以直接覆盖也可以 return ret
思路
- Python
random.sample()
中当n > 4**ceil(log(3*m, 4))
时采用的策略 - 使用一个
set
记录已经被抽到的位置,如果该位置已经存在,则继续def sample(arr, m): """""" n = len(arr) selected = set() ret = [None] * m for i in range(m): j = random.randrange(n) # 0 <= j < n while j in selected: j = random.randrange(n) selected.add(j) ret[i] = arr[j] return ret
- 如果
n < 4**ceil(log(3*m, 4))
时采用这个策略,可能会产生类似 Hash 冲突的问题,导致调用随机数方法的次数过多求解调用
rangd()
的次数是一个期望问题蓄水池抽样及实现 - handspeaker - 博客园
问题描述
给出一个数据流,这个数据流的长度为`n`,`n`很大或者未知(在不断增加),并且对该数据流中每个数据只能访问一次。
请写出一个抽样算法,从该数据流中抽取`m`个数据,使得所有数据被选中的概率相等(以概率`m/n`被选中)。
PS: 假设提供一个能返回**大随机整数**的函数`rand()`
基本思路 ->
等概率采样
蓄水池采样算法
- 伪代码
从`n`个元素中随机的等概率的抽取`m`个元素,其中`n`可能不确定 1. 将第 1 到 m 个元素加入“蓄水池”(作为被选中的元素) 2. 从第 m+i 个元素开始: 每个元素有 m/(m+i) 的概率被选中,若选中则随机替换掉蓄水池中的一个元素 3. 重复以上操作直到遍历完所有元素 > 根据以上伪代码,似乎还是要得到全部数据才能完成采样,问题出在哪里?实际上第`m+i`个元素被选中的概率是`m/(m+i)`,与`n`无关;即使`n`在不断增加,你依然可以获得达到`n`之前每个状态的结果。
- Python 实现
def Reservoir_sample(src, m): """""" ret = [] for i, item in enumerate(src): if i < m: ret.append(item) else: j = random.randint(0, i) # i == m+k if j < m: # 只要 j < m 就能加入 ret 表明 i >= m 之后的元素被选中的概率为 m/i = m/(m+k) ret[j] = src[i] return ret
- 证明:该算法保证每个元素以
m/n
的概率被选中n <= m
时,每个元素被 100% 选中n > m
时,n=m+1
时,根据算法定义,第m+1
个元素被选中的概率=m/m+1
,而前m
个元素被选中的概率=1-被第m+1个元素替换的概率
=1-(m/m+1)*(1/m)=m/m+1
;说明前m+1
个元素被选中的概率相等。n=m+k
时,第m+k
个元素被选中的概率=m/m+k=m/n
,前(n-1)
个元素被选中的概率=1-被第n个元素替换的概率
=1-(m/n)*(1/m)=m/n
;说明所有n
个元素被选中的概率相等。
蓄水池抽样及实现 - handspeaker - 博客园
-
图示
- 上面的线段按频度进行分割,下面为等距分割
- 往下方的线段随机打点(均匀分布),根据点所落在的区间对应到上方的线段,来确定采样的对象
Word2Vec 中负采样使用的方法
-
Python 实现
import random from collections import Counter def sample_non_balance(src, m): """""" cnt = Counter(src) # 样本计数 # 维护一个反向字典 table = dict() i = 0 for k, v in cnt.items(): for _ in range(v): table[i] = k i += 1 # 采样 m 次,m 可能大于 n n = len(src) ret = [] for _ in range(m): j = random.randrange(n) ret.append(table[j]) return ret
-
测试
src = [1] * 100 + [2] * 200 + [3] * 300 ret = sample_non_balance(src, 100000) # 采样 100000 次 ret_r = Counter(ret) for k, v in ret_r.items(): print(v / len(ret)) ''' 0.49926 0.33362 0.16712 '''
- 0 被采样 1 次,1 被采样 2 次,2 被采样 3 次,...
法 1
- 类似查表法的思路
def solve(n): table = dict() index = 0 for i in range(0, n): for _ in range(i + 1): table[index] = i index += 1 i = random.randrange((1 + n) * n / 2) return table[i] cnt = 1000000 ret = Counter([solve(4) for _ in range(cnt)]) for k, v in sorted(ret.items()): print(k, v / cnt) ''' 0 0.099951 1 0.200028 2 0.300225 3 0.399796 '''
法 2
- Python
def foo(n): while True: i = random.randrange(n) j = random.randrange(n) if i + j < n: return i + j cnt = 1000000 ret = Counter([foo(4) for _ in range(cnt)]) for k, v in sorted(ret.items()): print(k, v / cnt) ''' 0 0.099849 1 0.199983 2 0.299922 3 0.400246 '''
面试中随机数"等概率"vs"不等概率"生成问题 - hellogiser - 博客园
说明:
rand_n()
能够等概率生成[0, n)
内的整数,rand_m()
同理
假设 rand_m()
由以下方式实现
import random
def rand_m(m):
""""""
o = random.randrange(m)
return o
- 这种情况比较简单
- 因为
m >= n
,只要用rand_m()
生成的数在[0, n)
范围就返回,反之重新生成def rand_n(n): o = rand_m(5) while o >= n: o = rand_m(5) return o # 测试代码 cnt = 1000000 d = Counter([rand_n(3) for _ in range(cnt)]) for k, v in d.items(): print(k, v / cnt) print() ''' 0 0.33439 1 0.332293 2 0.333317 '''
-
以下方法要求
n <= m*m
-
先看代码再说思路(暂时忽略注释的代码):
def rand_n(n, m): """""" o = rand_m(m) * m + rand_m(m) # 错误写法: rand_m(m) * (m+1) # t = (m*m) // n * n # t 为小于 m*m 的 n 的最大整数倍 # while o >= t: while o >= n: o = rand_m(m) * m + rand_m(m) # o = o % n return o
-
思路
rand_m(m) * m + rand_m(m)
的意思是先等概率生成0, m, 2m, .. (m-1)*m
,然后在加上rand_m(m)
;最终效果相当于等概率生成[0, m*m)
之间的数- 然后再按照
m > n
时的做法生成n
-
注意到,如果
n << m*m
时,这个方法会产生大量废操作 -
一个优化方法就是等概率生成
t*n
中的数,然后对n
取模def rand_n(n, m): """""" o = rand_m(m) * m + rand_m(m) # 错误写法: rand_m(m) * (m+1) t = (m*m) // n * n # t 为小于 m*m 的 n 的最大整数倍 while o >= t: o = rand_m(m) * m + rand_m(m) o = o % n return o # 测试代码 cnt = 100000 d = Counter([rand_n(7, 5) for _ in range(cnt)]) for k, v in d.items(): print(k, v / cnt) print() ''' 3 0.14584 4 0.14105 0 0.14178 6 0.14287 1 0.14273 2 0.14088 5 0.14485 '''
问题 1
non_rand_01() 能够以不同的概率的生成 0/1;
请利用 non_rand_01() 等概率产生 0/1。
non_rand_01()
测试代码def non_rand_01(p=0.6): """以 p 的概率产生 1,1-p 的涵概率产生 0""" r = random.random() if r < p: return 1 else: return 0
- Python
// non_rand_01() 能以不同的概率生成 0/1 // rand_01(): 利用 non_rand_01() 等概率产生 0/1 def rand_01(): """利用 non_rand_01() 等概率产生 0/1""" while True: i1 = non_rand_01() i2 = non_rand_01() if i1 == 1 and i2 == 0: return 1 if i1 == 0 and i2 == 1: return 0 # 测试代码 cnt = 1000000 ret = Counter([rand_01() for _ in range(cnt)]) for k, v in ret.items(): print(k, v / cnt) ''' 1 0.499732 0 0.500268 '''
问题 2
non_rand_01() 能够以不同的概率的生成 0/1;
给定 n,请利用 non_rand_01() 等概率产生 0~n 之间的数值。
- 计算 n 的二进制位数,记为
k
;然后在问题 1 的基础上,产生长度为 k 的随机 0/1 数列 - 注意,产生的随机数可能大于
n
,当大于n
时,则丢弃并重新生成def rand_n(n): """利用 rand_01 等概率产生 [0, n) 中的随机数""" while True: k = int(math.log2(n)) + 1 ans = 0 for i in range(k): ans <<= 1 ans += rand_01() if ans < n: return ans # 测试代码 cnt = 1000000 ret = Counter([rand_n(10) for _ in range(cnt)]) for k, v in sorted(ret.items()): print(k, v / cnt) ''' 0 0.099953 1 0.100131 2 0.100158 3 0.100509 4 0.099724 5 0.099857 6 0.099917 7 0.099585 8 0.10014 9 0.100026 '''