DBComponentSystem.cs 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252
  1. using System;
  2. using System.Collections.Generic;
  3. using System.Linq.Expressions;
  4. using MongoDB.Driver;
  5. namespace ET.Server
  6. {
  7. [FriendOf(typeof(DBComponent))]
  8. public static class DBComponentSystem
  9. {
  10. public class DBComponentAwakeSystem : AwakeSystem<DBComponent, string, string, int>
  11. {
  12. protected override void Awake(DBComponent self, string dbConnection, string dbName, int zone)
  13. {
  14. self.mongoClient = new MongoClient(dbConnection);
  15. self.database = self.mongoClient.GetDatabase(dbName);
  16. }
  17. }
  18. private static IMongoCollection<T> GetCollection<T>(this DBComponent self, string collection = null)
  19. {
  20. return self.database.GetCollection<T>(collection ?? typeof (T).Name);
  21. }
  22. private static IMongoCollection<Entity> GetCollection(this DBComponent self, string name)
  23. {
  24. return self.database.GetCollection<Entity>(name);
  25. }
  26. #region Query
  27. public static async ETTask<T> Query<T>(this DBComponent self, long id, string collection = null) where T : Entity
  28. {
  29. using (await CoroutineLockComponent.Instance.Wait(CoroutineLockType.DB, id % DBComponent.TaskCount))
  30. {
  31. IAsyncCursor<T> cursor = await self.GetCollection<T>(collection).FindAsync(d => d.Id == id);
  32. return await cursor.FirstOrDefaultAsync();
  33. }
  34. }
  35. public static async ETTask<List<T>> Query<T>(this DBComponent self, Expression<Func<T, bool>> filter, string collection = null)
  36. where T : Entity
  37. {
  38. using (await CoroutineLockComponent.Instance.Wait(CoroutineLockType.DB, RandomGenerator.RandInt64() % DBComponent.TaskCount))
  39. {
  40. IAsyncCursor<T> cursor = await self.GetCollection<T>(collection).FindAsync(filter);
  41. return await cursor.ToListAsync();
  42. }
  43. }
  44. public static async ETTask<List<T>> Query<T>(this DBComponent self, long taskId, Expression<Func<T, bool>> filter, string collection = null)
  45. where T : Entity
  46. {
  47. using (await CoroutineLockComponent.Instance.Wait(CoroutineLockType.DB, taskId % DBComponent.TaskCount))
  48. {
  49. IAsyncCursor<T> cursor = await self.GetCollection<T>(collection).FindAsync(filter);
  50. return await cursor.ToListAsync();
  51. }
  52. }
  53. public static async ETTask Query(this DBComponent self, long id, List<string> collectionNames, List<Entity> result)
  54. {
  55. if (collectionNames == null || collectionNames.Count == 0)
  56. {
  57. return;
  58. }
  59. using (await CoroutineLockComponent.Instance.Wait(CoroutineLockType.DB, id % DBComponent.TaskCount))
  60. {
  61. foreach (string collectionName in collectionNames)
  62. {
  63. IAsyncCursor<Entity> cursor = await self.GetCollection(collectionName).FindAsync(d => d.Id == id);
  64. Entity e = await cursor.FirstOrDefaultAsync();
  65. if (e == null)
  66. {
  67. continue;
  68. }
  69. result.Add(e);
  70. }
  71. }
  72. }
  73. public static async ETTask<List<T>> QueryJson<T>(this DBComponent self, string json, string collection = null) where T : Entity
  74. {
  75. using (await CoroutineLockComponent.Instance.Wait(CoroutineLockType.DB, RandomGenerator.RandInt64() % DBComponent.TaskCount))
  76. {
  77. FilterDefinition<T> filterDefinition = new JsonFilterDefinition<T>(json);
  78. IAsyncCursor<T> cursor = await self.GetCollection<T>(collection).FindAsync(filterDefinition);
  79. return await cursor.ToListAsync();
  80. }
  81. }
  82. public static async ETTask<List<T>> QueryJson<T>(this DBComponent self, long taskId, string json, string collection = null) where T : Entity
  83. {
  84. using (await CoroutineLockComponent.Instance.Wait(CoroutineLockType.DB, RandomGenerator.RandInt64() % DBComponent.TaskCount))
  85. {
  86. FilterDefinition<T> filterDefinition = new JsonFilterDefinition<T>(json);
  87. IAsyncCursor<T> cursor = await self.GetCollection<T>(collection).FindAsync(filterDefinition);
  88. return await cursor.ToListAsync();
  89. }
  90. }
  91. #endregion
  92. #region Insert
  93. public static async ETTask InsertBatch<T>(this DBComponent self, IEnumerable<T> list, string collection = null) where T: Entity
  94. {
  95. if (collection == null)
  96. {
  97. collection = typeof (T).Name;
  98. }
  99. using (await CoroutineLockComponent.Instance.Wait(CoroutineLockType.DB, RandomGenerator.RandInt64() % DBComponent.TaskCount))
  100. {
  101. await self.GetCollection(collection).InsertManyAsync(list);
  102. }
  103. }
  104. #endregion
  105. #region Save
  106. public static async ETTask Save<T>(this DBComponent self, T entity, string collection = null) where T : Entity
  107. {
  108. if (entity == null)
  109. {
  110. Log.Error($"save entity is null: {typeof (T).Name}");
  111. return;
  112. }
  113. if (collection == null)
  114. {
  115. collection = entity.GetType().Name;
  116. }
  117. using (await CoroutineLockComponent.Instance.Wait(CoroutineLockType.DB, entity.Id % DBComponent.TaskCount))
  118. {
  119. await self.GetCollection(collection).ReplaceOneAsync(d => d.Id == entity.Id, entity, new ReplaceOptions { IsUpsert = true });
  120. }
  121. }
  122. public static async ETTask Save<T>(this DBComponent self, long taskId, T entity, string collection = null) where T : Entity
  123. {
  124. if (entity == null)
  125. {
  126. Log.Error($"save entity is null: {typeof (T).Name}");
  127. return;
  128. }
  129. if (collection == null)
  130. {
  131. collection = entity.GetType().Name;
  132. }
  133. using (await CoroutineLockComponent.Instance.Wait(CoroutineLockType.DB, taskId % DBComponent.TaskCount))
  134. {
  135. await self.GetCollection(collection).ReplaceOneAsync(d => d.Id == entity.Id, entity, new ReplaceOptions { IsUpsert = true });
  136. }
  137. }
  138. public static async ETTask Save(this DBComponent self, long id, List<Entity> entities)
  139. {
  140. if (entities == null)
  141. {
  142. Log.Error($"save entity is null");
  143. return;
  144. }
  145. using (await CoroutineLockComponent.Instance.Wait(CoroutineLockType.DB, id % DBComponent.TaskCount))
  146. {
  147. foreach (Entity entity in entities)
  148. {
  149. if (entity == null)
  150. {
  151. continue;
  152. }
  153. await self.GetCollection(entity.GetType().Name)
  154. .ReplaceOneAsync(d => d.Id == entity.Id, entity, new ReplaceOptions { IsUpsert = true });
  155. }
  156. }
  157. }
  158. public static async ETTask SaveNotWait<T>(this DBComponent self, T entity, long taskId = 0, string collection = null) where T : Entity
  159. {
  160. if (taskId == 0)
  161. {
  162. await self.Save(entity, collection);
  163. return;
  164. }
  165. await self.Save(taskId, entity, collection);
  166. }
  167. #endregion
  168. #region Remove
  169. public static async ETTask<long> Remove<T>(this DBComponent self, long id, string collection = null) where T : Entity
  170. {
  171. using (await CoroutineLockComponent.Instance.Wait(CoroutineLockType.DB, id % DBComponent.TaskCount))
  172. {
  173. DeleteResult result = await self.GetCollection<T>(collection).DeleteOneAsync(d => d.Id == id);
  174. return result.DeletedCount;
  175. }
  176. }
  177. public static async ETTask<long> Remove<T>(this DBComponent self, long taskId, long id, string collection = null) where T : Entity
  178. {
  179. using (await CoroutineLockComponent.Instance.Wait(CoroutineLockType.DB, taskId % DBComponent.TaskCount))
  180. {
  181. DeleteResult result = await self.GetCollection<T>(collection).DeleteOneAsync(d => d.Id == id);
  182. return result.DeletedCount;
  183. }
  184. }
  185. public static async ETTask<long> Remove<T>(this DBComponent self, Expression<Func<T, bool>> filter, string collection = null) where T : Entity
  186. {
  187. using (await CoroutineLockComponent.Instance.Wait(CoroutineLockType.DB, RandomGenerator.RandInt64() % DBComponent.TaskCount))
  188. {
  189. DeleteResult result = await self.GetCollection<T>(collection).DeleteManyAsync(filter);
  190. return result.DeletedCount;
  191. }
  192. }
  193. public static async ETTask<long> Remove<T>(this DBComponent self, long taskId, Expression<Func<T, bool>> filter, string collection = null)
  194. where T : Entity
  195. {
  196. using (await CoroutineLockComponent.Instance.Wait(CoroutineLockType.DB, taskId % DBComponent.TaskCount))
  197. {
  198. DeleteResult result = await self.GetCollection<T>(collection).DeleteManyAsync(filter);
  199. return result.DeletedCount;
  200. }
  201. }
  202. #endregion
  203. }
  204. }